diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 29163e7f3054627cd5df711a7f0225ca46ae3fe3..f86fd20e591909670c8c988e3f759774aa989911 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -167,39 +167,39 @@ private[spark] object SSLOptions extends Logging {
   def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
     val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
 
-    val port = conf.getOption(s"$ns.port").map(_.toInt)
+    val port = conf.getWithSubstitution(s"$ns.port").map(_.toInt)
     port.foreach { p =>
       require(p >= 0, "Port number must be a non-negative value.")
     }
 
-    val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
+    val keyStore = conf.getWithSubstitution(s"$ns.keyStore").map(new File(_))
         .orElse(defaults.flatMap(_.keyStore))
 
-    val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
+    val keyStorePassword = conf.getWithSubstitution(s"$ns.keyStorePassword")
         .orElse(defaults.flatMap(_.keyStorePassword))
 
-    val keyPassword = conf.getOption(s"$ns.keyPassword")
+    val keyPassword = conf.getWithSubstitution(s"$ns.keyPassword")
         .orElse(defaults.flatMap(_.keyPassword))
 
-    val keyStoreType = conf.getOption(s"$ns.keyStoreType")
+    val keyStoreType = conf.getWithSubstitution(s"$ns.keyStoreType")
         .orElse(defaults.flatMap(_.keyStoreType))
 
     val needClientAuth =
       conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))
 
-    val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
+    val trustStore = conf.getWithSubstitution(s"$ns.trustStore").map(new File(_))
         .orElse(defaults.flatMap(_.trustStore))
 
-    val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
+    val trustStorePassword = conf.getWithSubstitution(s"$ns.trustStorePassword")
         .orElse(defaults.flatMap(_.trustStorePassword))
 
-    val trustStoreType = conf.getOption(s"$ns.trustStoreType")
+    val trustStoreType = conf.getWithSubstitution(s"$ns.trustStoreType")
         .orElse(defaults.flatMap(_.trustStoreType))
 
-    val protocol = conf.getOption(s"$ns.protocol")
+    val protocol = conf.getWithSubstitution(s"$ns.protocol")
         .orElse(defaults.flatMap(_.protocol))
 
-    val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms")
+    val enabledAlgorithms = conf.getWithSubstitution(s"$ns.enabledAlgorithms")
         .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet)
         .orElse(defaults.map(_.enabledAlgorithms))
         .getOrElse(Set.empty)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index de2f475c6895f3448d1807db91de9b9178dbce2a..715cfdcc8f4ef478602481dd72a80253a4adc738 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -373,6 +373,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
     Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
   }
 
+  /** Get an optional value, applying variable substitution. */
+  private[spark] def getWithSubstitution(key: String): Option[String] = {
+    getOption(key).map(reader.substitute(_))
+  }
+
   /** Get all parameters as a list of pairs */
   def getAll: Array[(String, String)] = {
     settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
index 6fc7cea6ee94a78d506e133edf4aef1d4ef75417..8eabc2b3cb958a7018c882a463dd398bdb39d4da 100644
--- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -22,6 +22,8 @@ import javax.net.ssl.SSLContext
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.util.SparkConfWithEnv
+
 class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
 
   test("test resolving property file as spark conf ") {
@@ -133,4 +135,18 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
     assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
   }
 
+  test("variable substitution") {
+    val conf = new SparkConfWithEnv(Map(
+      "ENV1" -> "val1",
+      "ENV2" -> "val2"))
+
+    conf.set("spark.ssl.enabled", "true")
+    conf.set("spark.ssl.keyStore", "${env:ENV1}")
+    conf.set("spark.ssl.trustStore", "${env:ENV2}")
+
+    val opts = SSLOptions.parse(conf, "spark.ssl", defaults = None)
+    assert(opts.keyStore === Some(new File("val1")))
+    assert(opts.trustStore === Some(new File("val2")))
+  }
+
 }