diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 0745ef47ffdac31ede8703460eabe14f825821a2..99d92b9257b12aa52cd8fe4f564a70a8337fe222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -297,7 +297,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
       // If offsets have already been created, we trying to resume a query.
       val checkpointPath = new Path(checkpointLocation, "offsets")
-      val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+      val fs = checkpointPath.getFileSystem(df.sqlContext.sessionState.hadoopConf)
       if (fs.exists(checkpointPath)) {
         throw new AnalysisException(
           s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4c9977c8c7373c99826487a1fef0005bf5fbc2c5..dde139608aa6b91fea9e6ecf0310c0a73b40ddfc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -103,7 +103,8 @@ class SQLContext private[sql](
 
   protected[sql] def sessionState: SessionState = sparkSession.sessionState
   protected[sql] def sharedState: SharedState = sparkSession.sharedState
-  protected[sql] def conf: SQLConf = sparkSession.conf
+  protected[sql] def conf: SQLConf = sessionState.conf
+  protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf
   protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager
   protected[sql] def listener: SQLListener = sparkSession.listener
   protected[sql] def externalCatalog: ExternalCatalog = sparkSession.externalCatalog
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 3561765642dd89e16809090d70a24c79592f851c..00256bd0bee9b78ecca3d58c965ff522341a6d08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation}
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, LongType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -76,8 +76,8 @@ class SparkSession private(
   }
 
   /**
-   * State isolated across sessions, including SQL configurations, temporary tables,
-   * registered functions, and everything else that accepts a [[SQLConf]].
+   * State isolated across sessions, including SQL configurations, temporary tables, registered
+   * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
    */
   @transient
   protected[sql] lazy val sessionState: SessionState = {
@@ -103,7 +103,6 @@ class SparkSession private(
     _wrapped = sqlContext
   }
 
-  protected[sql] def conf: SQLConf = sessionState.conf
   protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
   protected[sql] def listener: SQLListener = sharedState.listener
   protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
@@ -191,6 +190,22 @@ class SparkSession private(
    |  Methods for accessing or mutating configurations  |
    * -------------------------------------------------- */
 
+  @transient private lazy val _conf: RuntimeConfig = {
+    new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
+  }
+
+  /**
+   * Runtime configuration interface for Spark.
+   *
+   * This is the interface through which the user can get and set all Spark and Hadoop
+   * configurations that are relevant to Spark SQL. When getting the value of a config,
+   * this defaults to the value set in the underlying [[SparkContext]], if any.
+   *
+   * @group config
+   * @since 2.0.0
+   */
+  def conf: RuntimeConfig = _conf
+
   /**
    * Set Spark SQL configuration properties.
    *
@@ -213,7 +228,7 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def getConf(key: String): String = conf.getConfString(key)
+  def getConf(key: String): String = sessionState.conf.getConfString(key)
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -222,7 +237,9 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue)
+  def getConf(key: String, defaultValue: String): String = {
+    sessionState.conf.getConfString(key, defaultValue)
+  }
 
   /**
    * Return all the configuration properties that have been set (i.e. not the default).
@@ -231,7 +248,7 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+  def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs
 
   /**
    * Set the given Spark SQL configuration property.
@@ -244,7 +261,7 @@ class SparkSession private(
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
    * yet, return `defaultValue` in [[ConfigEntry]].
    */
-  protected[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
+  protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry)
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -252,7 +269,7 @@ class SparkSession private(
    * desired one.
    */
   protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
-    conf.getConf(entry, defaultValue)
+    sessionState.conf.getConf(entry, defaultValue)
   }
 
 
@@ -601,7 +618,7 @@ class SparkSession private(
    */
   @Experimental
   def createExternalTable(tableName: String, path: String): DataFrame = {
-    val dataSourceName = conf.defaultDataSourceName
+    val dataSourceName = sessionState.conf.defaultDataSourceName
     createExternalTable(tableName, path, dataSourceName)
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
index 7fa246ba51519d252f84ac897a578b2acdda120e..e6c5351106c478831e3787cd8fed05c1d3aaf73d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
           catalogTable.storage.locationUri.map { p =>
             val path = new Path(p)
             try {
-              val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+              val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
               calculateTableSize(fs, path)
             } catch {
               case NonFatal(e) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index eae8fe897521ca383f0202ed0bf419b05862c2fe..5cac9d879fb82fbefa34843e79199ea40a5bab26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -210,7 +210,7 @@ case class LoadData(
           // Follow Hive's behavior:
           // If no schema or authority is provided with non-local inpath,
           // we will use hadoop configuration "fs.default.name".
-          val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+          val defaultFSConf = sqlContext.sessionState.hadoopConf.get("fs.default.name")
           val defaultFS = if (defaultFSConf == null) {
             new URI("")
           } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 07bc8ae148ac8bb457c946fa539593b5cfd6a34a..4e7214ce833793d185747c6d5aec70391cfb33d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -131,7 +131,7 @@ case class DataSource(
     val allPaths = caseInsensitiveOptions.get("path")
     val globbedPaths = allPaths.toSeq.flatMap { path =>
       val hdfsPath = new Path(path)
-      val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
       SparkHadoopUtil.get.globPathIfNecessary(qualified)
     }.toArray
@@ -225,7 +225,7 @@ case class DataSource(
       case Seq(singlePath) =>
         try {
           val hdfsPath = new Path(singlePath)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
           val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
           val res = fs.exists(metadataPath)
           res
@@ -284,7 +284,7 @@ case class DataSource(
         val allPaths = caseInsensitiveOptions.get("path") ++ paths
         val globbedPaths = allPaths.flatMap { path =>
           val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
@@ -374,7 +374,7 @@ case class DataSource(
           val path = new Path(caseInsensitiveOptions.getOrElse("path", {
             throw new IllegalArgumentException("'path' is not specified")
           }))
-          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 95629a9923cd3c0da5ad415e11d2015903a034c5..a636ca2f29a9191d0a7f514f96a92fcd4d97a594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import java.io.IOException
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
@@ -77,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
           s"cannot save to file.")
     }
 
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
     val fs = outputPath.getFileSystem(hadoopConf)
     val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 61ec7ed2b15519fe2b7b50b5f2876716ac201d5d..7d407a7747f2c9c98f441cb548012a633b4e05ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -103,7 +103,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     val csvOptions = new CSVOptions(options)
     val headers = requiredSchema.fields.map(_.name)
 
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 
     (file: PartitionedFile) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 4063c6ebce4b5ff98ac4b1bc478e10e83cc902a7..731b0047e5245617425b1ba0933f0c3df095fdee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -271,7 +271,7 @@ class HDFSFileCatalog(
     val partitionSchema: Option[StructType])
   extends FileCatalog with Logging {
 
-  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+  private val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
 
   var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
   var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 7773ff550fe0a8ab205dc452be246bdf14435e72..580a0e1de6c18958e4a3561a657d00c3b1f8ef5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -98,7 +98,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf =
       sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 
@@ -126,7 +126,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   }
 
   private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = {
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val job = Job.getInstance(sqlContext.sessionState.hadoopConf)
     val conf = job.getConfiguration
 
     val paths = inputPaths.map(_.getPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bbbbc5ebe9030eb0632ad8d86293dbefef0b838f..28c6664085d8bec7fb07870ab99548171cb6d6d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -263,7 +263,7 @@ private[sql] class DefaultSource
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
-    val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val parquetConf = new Configuration(sqlContext.sessionState.hadoopConf)
     parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
     parquetConf.set(
       CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
@@ -648,7 +648,7 @@ private[sql] object ParquetRelation extends Logging {
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
     val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
-    val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+    val serializedConf = new SerializableConfiguration(sqlContext.sessionState.hadoopConf)
 
     // !! HACK ALERT !!
     //
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index fa0df61ca5f2c771c86953f40c33ca39e99b5383..f7ac1ac8e48cf01fae42dc50c396f55e2fccc137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -90,7 +90,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf =
       sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 4f722a514ba698f1683164d18c51796b3b4f3668..a86108862fe80724fe28adfb5b677db5efc2a975 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -45,7 +45,7 @@ class FileStreamSink(
   private val basePath = new Path(path)
   private val logPath = new Path(basePath, FileStreamSink.metadataDir)
   private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString)
-  private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+  private val fs = basePath.getFileSystem(sqlContext.sessionState.hadoopConf)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 51c3aee8358ffb89af1141a40c0f173731b9c6f7..aeb64c929c651820163537b955cd68fee133083d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -38,7 +38,7 @@ class FileStreamSource(
     override val schema: StructType,
     dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
 
-  private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+  private val fs = new Path(path).getFileSystem(sqlContext.sessionState.hadoopConf)
   private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index b52f7a28b408a6f35a7a0c68842a8906abb6350e..dd6760d341b5e975bf9457dd112d382ef055fdf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -212,7 +212,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
   }
 
   private def createFileManager(): FileManager = {
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
     try {
       new FileContextManager(metadataPath, hadoopConf)
     } catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index de4305f5642547d04e3f5ccb5bb0827f45b8207c..d5e4dd8f78ac260809df8006f3939941543ee01f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -63,7 +63,7 @@ case class StateStoreRestoreExec(
       storeVersion = getStateId.batchId,
       keyExpressions.toStructType,
       child.output.toStructType,
-      new StateStoreConf(sqlContext.conf),
+      sqlContext.sessionState,
       Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
         val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
         iter.flatMap { row =>
@@ -92,7 +92,7 @@ case class StateStoreSaveExec(
       storeVersion = getStateId.batchId,
       keyExpressions.toStructType,
       child.output.toStructType,
-      new StateStoreConf(sqlContext.conf),
+      sqlContext.sessionState,
       Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
         new Iterator[InternalRow] {
           private[this] val baseIterator = iter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
index 95b5129351ff40656a969b707519ff0b58c8fba2..a08a4bb4c361b01703a27cf31ad73b7f5fbba5ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -30,7 +30,7 @@ class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog
   val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")
   val metadataLog = new FileStreamSinkLog(sqlContext, metadataDirectory.toUri.toString)
-  val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+  val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
 
   override def paths: Seq[Path] = path :: Nil
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index d708486d8ea0bd61e716a450d9545d3a22ef989f..635bb866075e3fc3f5aac79dbd4170122d14279b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
@@ -37,13 +38,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
     storeVersion: Long,
     keySchema: StructType,
     valueSchema: StructType,
-    storeConf: StateStoreConf,
+    sessionState: SessionState,
     @transient private val storeCoordinator: Option[StateStoreCoordinatorRef])
   extends RDD[U](dataRDD) {
 
+  private val storeConf = new StateStoreConf(sessionState.conf)
+
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = dataRDD.context.broadcast(
-    new SerializableConfiguration(dataRDD.context.hadoopConfiguration))
+    new SerializableConfiguration(sessionState.hadoopConf))
 
   override protected def getPartitions: Array[Partition] = dataRDD.partitions
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index 9b6d0918e29c13bc2b2d4313037284389ce598bf..4914a9d722a83d2e35db474271a95b68170b5bf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 
 package object state {
@@ -43,7 +44,7 @@ package object state {
         storeVersion,
         keySchema,
         valueSchema,
-        new StateStoreConf(sqlContext.conf),
+        sqlContext.sessionState,
         Some(sqlContext.streams.stateStoreCoordinator))(
         storeUpdateFunction)
     }
@@ -55,7 +56,7 @@ package object state {
         storeVersion: Long,
         keySchema: StructType,
         valueSchema: StructType,
-        storeConf: StateStoreConf,
+        sessionState: SessionState,
         storeCoordinator: Option[StateStoreCoordinatorRef])(
         storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
       val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
@@ -67,7 +68,7 @@ package object state {
         storeVersion,
         keySchema,
         valueSchema,
-        storeConf,
+        sessionState,
         storeCoordinator)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
index 058df1e3c19a779fd0ec04e293050622d4d113f9..137323583b3ebd5119f06e1adf52ebd3ccd02dda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark.sql.internal
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.sql.RuntimeConfig
 
+
 /**
  * Implementation for [[RuntimeConfig]].
  */
-class RuntimeConfigImpl extends RuntimeConfig {
-
-  private val conf = new SQLConf
-
-  private val hadoopConf = java.util.Collections.synchronizedMap(
-    new java.util.HashMap[String, String]())
+class RuntimeConfigImpl(
+    sqlConf: SQLConf = new SQLConf,
+    hadoopConf: Configuration = new Configuration)
+  extends RuntimeConfig {
 
   override def set(key: String, value: String): RuntimeConfig = {
-    conf.setConfString(key, value)
+    sqlConf.setConfString(key, value)
     this
   }
 
@@ -39,7 +40,7 @@ class RuntimeConfigImpl extends RuntimeConfig {
   override def set(key: String, value: Long): RuntimeConfig = set(key, value.toString)
 
   @throws[NoSuchElementException]("if the key is not set")
-  override def get(key: String): String = conf.getConfString(key)
+  override def get(key: String): String = sqlConf.getConfString(key)
 
   override def getOption(key: String): Option[String] = {
     try Option(get(key)) catch {
@@ -47,27 +48,26 @@ class RuntimeConfigImpl extends RuntimeConfig {
     }
   }
 
-  override def unset(key: String): Unit = conf.unsetConf(key)
+  override def unset(key: String): Unit = sqlConf.unsetConf(key)
 
-  override def setHadoop(key: String, value: String): RuntimeConfig = {
-    hadoopConf.put(key, value)
+  override def setHadoop(key: String, value: String): RuntimeConfig = hadoopConf.synchronized {
+    hadoopConf.set(key, value)
     this
   }
 
   @throws[NoSuchElementException]("if the key is not set")
   override def getHadoop(key: String): String = hadoopConf.synchronized {
-    if (hadoopConf.containsKey(key)) {
-      hadoopConf.get(key)
-    } else {
+    Option(hadoopConf.get(key)).getOrElse {
       throw new NoSuchElementException(key)
     }
   }
 
-  override def getHadoopOption(key: String): Option[String] = {
-    try Option(getHadoop(key)) catch {
-      case _: NoSuchElementException => None
-    }
+  override def getHadoopOption(key: String): Option[String] = hadoopConf.synchronized {
+    Option(hadoopConf.get(key))
+  }
+
+  override def unsetHadoop(key: String): Unit = hadoopConf.synchronized {
+    hadoopConf.unset(key)
   }
 
-  override def unsetHadoop(key: String): Unit = hadoopConf.remove(key)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index f683bbbeb5a9b1b60245d0a1606dcb967d5dfe50..04ad729659b61f870aa083a2c760d18ee8f2c962 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -21,6 +21,8 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
@@ -46,6 +48,7 @@ private[sql] class SessionState(ctx: SQLContext) {
    * SQL-specific key-value configurations.
    */
   lazy val conf: SQLConf = new SQLConf
+  lazy val hadoopConf: Configuration = new Configuration(ctx.sparkContext.hadoopConfiguration)
 
   // Automatically extract `spark.sql.*` entries and put it in our SQLConf
   setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2f62ad4850deed5b5eb45e6c228ac8801b45dfca..d49cc103b55b38827489298d8d4af150d3d897dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -85,4 +85,27 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
   }
 
+  test("Hadoop conf interaction between SQLContext and SparkContext") {
+    val mySpecialKey = "mai.special.key"
+    val mySpecialValue = "msv"
+    try {
+      sc.hadoopConfiguration.set(mySpecialKey, mySpecialValue)
+      val sqlContext = SQLContext.getOrCreate(sc)
+      val sessionState = sqlContext.sessionState
+      assert(sessionState.hadoopConf.get(mySpecialKey) === mySpecialValue)
+      assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === mySpecialValue)
+      // mutating hadoop conf in SQL doesn't mutate the underlying one
+      sessionState.hadoopConf.set(mySpecialKey, "no no no")
+      assert(sessionState.hadoopConf.get(mySpecialKey) === "no no no")
+      assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "no no no")
+      assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
+      sqlContext.runtimeConf.setHadoop(mySpecialKey, "yes yes yes")
+      assert(sessionState.hadoopConf.get(mySpecialKey) === "yes yes yes")
+      assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "yes yes yes")
+      assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
+    } finally {
+      sc.hadoopConfiguration.unset(mySpecialKey)
+    }
+  }
+
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 581095d3dc1c2e80e29c35b74958e2199207343e..0aab36ae38401332c169412ebfb2357c68fb48d4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf)
       readParquetFile(path.toString)(df => {
         val sparkTypes = df.schema.map(_.dataType)
@@ -250,7 +250,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf)
       val errorMessage = intercept[Throwable] {
         sqlContext.read.parquet(path.toString).printSchema()
@@ -271,7 +271,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf)
       val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
       assert(sparkTypes === expectedSparkTypes)
@@ -431,7 +431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     withTempPath { location =>
       val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf, extraMetadata)
 
       readParquetFile(path.toString) { df =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 70c2a82990ba961f792ab38ad309fc10c47ccc57..a164f4c733b8498e3dbd63b440a1de9d001bb1d0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -217,7 +217,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
       SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
       SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
       withFileStreamSinkLog { sinkLog =>
-        val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.hadoopConf)
 
         def listBatchFiles(): Set[String] = {
           fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 13281427045c522bc8c34be37369255c51a0d9e2..22e011cfb726c7603ec4513aa8e44e7b26348ffe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -82,7 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
-    sqlContext.sparkContext.hadoopConfiguration.set(
+    sqlContext.sessionState.hadoopConf.set(
       s"fs.$scheme.impl",
       classOf[FakeFileSystem].getName)
     withTempDir { temp =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5691105235f5b9ef0f1dfdb79686e91856ff8d88..fcfac359f373383da95a676918ba3cbd92e302c1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -88,7 +88,7 @@ private[sql] trait SQLTestUtils
    * The Hadoop configuration used by the active [[SQLContext]].
    */
   protected def hadoopConfiguration: Configuration = {
-    sparkContext.hadoopConfiguration
+    sqlContext.sessionState.hadoopConf
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 431ac8e2c8a33cf9dd93adadedc6966abb24e5a1..d270775af652283527b936873c065bc2f3f362e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.test
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SQLConf}
 
 /**
  * A special [[SQLContext]] prepared for testing.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 13f29e08fb07d4a71fb75b687fece4fb3d515ca3..edb87b94ea93f5f0a51b60829dae32baf0bf7b7b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -544,7 +544,7 @@ private[hive] class MetaStoreFileCatalog(
   extends HDFSFileCatalog(ctx, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) {
 
   override def getStatus(path: Path): Array[FileStatus] = {
-    val fs = path.getFileSystem(ctx.sparkContext.hadoopConfiguration)
+    val fs = path.getFileSystem(ctx.sessionState.hadoopConf)
     fs.listStatus(path)
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6457a904eb1859010e691f48ea03c6f546a998ae..bf0288c9f7eb79424a11550608b2976aa84caf6d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -223,6 +223,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
     conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  // TODO: why do we get this from SparkConf but not SQLConf?
   def hiveThriftServerSingleSession: Boolean = {
     ctx.sparkContext.conf.getBoolean(
       "spark.sql.hive.thriftServer.singleSession", defaultValue = false)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4250a873412b0ec7e618817b0a05bc31d93590f8..1095f5fd95819689f5902197b2bab86736af9c05 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -57,7 +57,7 @@ private[sql] class DefaultSource
       files: Seq[FileStatus]): Option[StructType] = {
     OrcFileOperator.readSchema(
       files.map(_.getPath.toUri.toString),
-      Some(sqlContext.sparkContext.hadoopConfiguration)
+      Some(new Configuration(sqlContext.sessionState.hadoopConf))
     )
   }
 
@@ -115,7 +115,7 @@ private[sql] class DefaultSource
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
-    val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val orcConf = new Configuration(sqlContext.sessionState.hadoopConf)
 
     if (sqlContext.conf.orcFilterPushDown) {
       // Sets pushed predicates
@@ -278,7 +278,7 @@ private[orc] case class OrcTableScan(
   with HiveInspectors {
 
   def execute(): RDD[InternalRow] = {
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf))
     val conf = job.getConfiguration
 
     // Tries to push down filters if ORC filter push-down is enabled
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index bf099e09e3e07845151d4f44878706854481aa81..04b2494043b78a74a5b649a47549197d8b7b31e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SQLConf}
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 // SPARK-3729: Test key required to check for initialization errors with config.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 68244cdb11cb8731b6e32b991bd60f1ed7ea5ec9..5965cdc81c128a3247ad3b86ac6cfb8107b1058b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -374,7 +374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val expectedPath =
         sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
-      val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+      val fs = filesystemPath.getFileSystem(sqlContext.sessionState.hadoopConf)
       if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
 
       // It is a managed table when we do not specify the location.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 014c1009ed9ae44cc1728c25e1171616aa47722c..8b4e4dced8c880cfbc3d0f106d3fd11d5cdf5a55 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -239,12 +239,12 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       }
 
       // Unset default URI Scheme and Authority: throw exception
-      val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name")
-      hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name")
+      val originalFsName = hiveContext.sessionState.hadoopConf.get("fs.default.name")
+      hiveContext.sessionState.hadoopConf.unset("fs.default.name")
       intercept[AnalysisException] {
         sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
       }
-      hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName)
+      hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName)
     }
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 206d911e0d8a29b1ff239f6c124956c2293ae572..fd19fcbd4e9c7126fc82da012db485375de7dc29 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val expectedTablePath =
       hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
     val filesystemPath = new Path(expectedTablePath)
-    val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+    val fs = filesystemPath.getFileSystem(hiveContext.sessionState.hadoopConf)
     fs.exists(filesystemPath)
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 2345c1cf9cc09c00306126e0a4a57ca37c9a54a3..f11c055fb997151ac042587369dd4bd16011c7d7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         .orc(path)
 
       // Check if this is compressed as ZLIB.
-      val conf = sparkContext.hadoopConfiguration
+      val conf = sqlContext.sessionState.hadoopConf
       val fs = FileSystem.getLocal(conf)
       val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
       assert(maybeOrcFile.isDefined)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 4fb78ac02cb55c5b4366b0894a2cc93bb4c1badd..0a0cdf60e8a13f94f83a9e35611c2ce44f38727c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -181,7 +181,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
   // Following codec is supported in hive-0.13.1, ignore it now
   ignore("Other compression options for writing to an ORC file - 0.13.1 and above") {
     val data = (1 to 100).map(i => (i, s"val_$i"))
-    val conf = sparkContext.hadoopConfiguration
+    val conf = sqlContext.sessionState.hadoopConf
 
     conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
     withOrcFile(data) { file =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index dad4f87ae39717f6ebf4aee1bd7629707303897e..eced8ed57f87efb416d7ff74843d829e4e765ea3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -74,7 +74,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
       inputAttributes.find(_.name == field.name)
     }
 
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf =
       sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 5378336ff8a907fef11e07f8828cf4a3b8300736..3b16468e76fea9df7f291bb34169453abfa18422 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -209,7 +209,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath)
 
       val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
       assert(fs.listStatus(path).isEmpty)
     }
   }
@@ -510,7 +510,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
         s"${file.getCanonicalFile}/p1=2/p2=bar"
       ).map { p =>
         val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
         path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
       }