diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3c9439b2e9a5225876ccc5f7b405fd7c1fde7e40..e715d9434a2ab349b8414843f52ca2be8e8ddaab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,8 +91,8 @@ private[sql] trait CacheManager {
         CachedData(
           planToCache,
           InMemoryRelation(
-            useCompression,
-            columnBatchSize,
+            conf.useCompression,
+            conf.columnBatchSize,
             storageLevel,
             query.queryExecution.executedPlan,
             tableName))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f5bf935522dada4e59dd859c780227f88c3ac657..206d16f5b34f5400a5215afd833a97329953d095 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -61,7 +61,7 @@ private[spark] object SQLConf {
  *
  * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
  */
-private[sql] trait SQLConf {
+private[sql] class SQLConf {
   import SQLConf._
 
   /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e7021cc3366ca4f94e79b941c0f1a4bdcea4ea26..d8efce0cb43eb5eb0893bb0d7ebe7ab2076991ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql
 
+import java.util.Properties
+
+import scala.collection.immutable
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
@@ -49,7 +52,6 @@ import org.apache.spark.sql.sources.{DataSourceStrategy, BaseRelation, DDLParser
 @AlphaComponent
 class SQLContext(@transient val sparkContext: SparkContext)
   extends org.apache.spark.Logging
-  with SQLConf
   with CacheManager
   with ExpressionConversions
   with UDFRegistration
@@ -57,6 +59,30 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   self =>
 
+  // Note that this is a lazy val so we can override the default value in subclasses.
+  private[sql] lazy val conf: SQLConf = new SQLConf
+
+  /** Set Spark SQL configuration properties. */
+  def setConf(props: Properties): Unit = conf.setConf(props)
+
+  /** Set the given Spark SQL configuration property. */
+  def setConf(key: String, value: String): Unit = conf.setConf(key, value)
+
+  /** Return the value of Spark SQL configuration property for the given key. */
+  def getConf(key: String): String = conf.getConf(key)
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. If the key is not set
+   * yet, return `defaultValue`.
+   */
+  def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
+
+  /**
+   * Return all the configuration properties that have been set (i.e. not the default).
+   * This creates a new copy of the config properties in the form of a Map.
+   */
+  def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+
   @transient
   protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
 
@@ -212,7 +238,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
-    val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
     val appliedSchema =
       Option(schema).getOrElse(
         JsonRDD.nullTypeToStringType(
@@ -226,7 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
-    val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
     val appliedSchema =
       JsonRDD.nullTypeToStringType(
         JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
@@ -299,10 +325,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group userf
    */
   def sql(sqlText: String): SchemaRDD = {
-    if (dialect == "sql") {
+    if (conf.dialect == "sql") {
       new SchemaRDD(this, parseSql(sqlText))
     } else {
-      sys.error(s"Unsupported SQL dialect: $dialect")
+      sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
     }
   }
 
@@ -323,9 +349,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     val sqlContext: SQLContext = self
 
-    def codegenEnabled = self.codegenEnabled
+    def codegenEnabled = self.conf.codegenEnabled
 
-    def numPartitions = self.numShufflePartitions
+    def numPartitions = self.conf.numShufflePartitions
 
     def strategies: Seq[Strategy] =
       extraStrategies ++ (
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 8884204e5079fc4778f51485a15c06e113755a71..7f868cd4afca44f570c73e420cc9331fed13bb0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -52,7 +52,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    * @group userf
    */
   def sql(sqlText: String): JavaSchemaRDD = {
-    if (sqlContext.dialect == "sql") {
+    if (sqlContext.conf.dialect == "sql") {
       new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
     } else {
       sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
@@ -164,7 +164,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    * It goes through the entire dataset once to determine the schema.
    */
   def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
-    val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
     val appliedScalaSchema =
       JsonRDD.nullTypeToStringType(
         JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
@@ -182,7 +182,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    */
   @Experimental
   def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
-    val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
     val appliedScalaSchema =
       Option(asScalaDataType(schema)).getOrElse(
         JsonRDD.nullTypeToStringType(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 1e432485c4c29bf1e8ac4b617a2f9d18c3f6877d..065fae3c83df1b42654b6a43b3ae55a3c8c345d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -82,7 +82,7 @@ private[sql] case class InMemoryRelation(
     if (batchStats.value.isEmpty) {
       // Underlying columnar RDD hasn't been materialized, no useful statistics information
       // available, return the default statistics.
-      Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes)
+      Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
     } else {
       // Underlying columnar RDD has been materialized, required information has also been collected
       // via the `batchStats` accumulator, compute the final statistics, and update `_statistics`.
@@ -233,7 +233,7 @@ private[sql] case class InMemoryColumnarTableScan(
   val readPartitions = sparkContext.accumulator(0)
   val readBatches = sparkContext.accumulator(0)
 
-  private val inMemoryPartitionPruningEnabled = sqlContext.inMemoryPartitionPruning
+  private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
   override def execute() = {
     readPartitions.setValue(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index d7c811ca89022698cd9e28f4cef7c62c62e786c8..7c0b72aab448ebbe231768e2a6b7788b9f4e47a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -123,7 +123,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
  */
 private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
   // TODO: Determine the number of partitions.
-  def numPartitions = sqlContext.numShufflePartitions
+  def numPartitions = sqlContext.conf.numShufflePartitions
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
     case operator: SparkPlan =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d2d8cb1c62d406be58e40768bf45a28b3bf124e5..069e950195302c060209fd6b605a0d759e413947 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -69,7 +69,7 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont
   @transient override lazy val statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
-    sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+    sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
   )
 }
 
@@ -106,6 +106,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
   @transient override lazy val statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
-    sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+    sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
   )
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 017c78d2c66d4cf58c8316a2f23885763b973c39..6fecd1ff066c39f2f62003eb10b36905223e112b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -51,7 +51,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   // sqlContext will be null when we are being deserialized on the slaves.  In this instance
   // the value of codegenEnabled will be set by the desserializer after the constructor has run.
   val codegenEnabled: Boolean = if (sqlContext != null) {
-    sqlContext.codegenEnabled
+    sqlContext.conf.codegenEnabled
   } else {
     false
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d91b1fbc6983495e402e2321dfad0b863a2978dc..0652d2ff7c9ab87006772aa02826f4924adb8190 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -35,8 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   object LeftSemiJoin extends Strategy with PredicateHelper {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.autoBroadcastJoinThreshold > 0 &&
-          right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+        if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+          right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
         val semiJoin = joins.BroadcastLeftSemiJoinHash(
           leftKeys, rightKeys, planLater(left), planLater(right))
         condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
@@ -81,13 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.autoBroadcastJoinThreshold > 0 &&
-           right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+        if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+           right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
         makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.autoBroadcastJoinThreshold > 0 &&
-           left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+        if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+           left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
           makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
@@ -215,7 +215,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
       case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
         val prunePushedDownFilters =
-          if (sqlContext.parquetFilterPushDown) {
+          if (sqlContext.conf.parquetFilterPushDown) {
             (predicates: Seq[Expression]) => {
               // Note: filters cannot be pushed down to Parquet if they contain more complex
               // expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
@@ -237,7 +237,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           ParquetTableScan(
             _,
             relation,
-            if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
+            if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
 
       case _ => Nil
     }
@@ -270,7 +270,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
         execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
-      case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
+      case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
         execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
       case logical.Sort(sortExprs, global, child) =>
         execution.Sort(sortExprs, global, planLater(child)):: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index df8e61615104c9ca320bbd97f54bd9290817ff47..af6b07bd6c2f491bcbab14e10d7808ca11bb0c7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -94,7 +94,7 @@ case class SetCommand(
       logWarning(
         s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
           s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
+      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
 
     // Queries a single property.
     case Some((key, None)) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index fbe1d06ed2e829b0cb16c6309a3ff7de768ba96c..2dd22c020ef128c4ebaf86f5a1aed5be4cecde7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -43,7 +43,7 @@ case class BroadcastHashJoin(
   extends BinaryNode with HashJoin {
 
   val timeout = {
-    val timeoutValue = sqlContext.broadcastTimeout
+    val timeoutValue = sqlContext.conf.broadcastTimeout
     if (timeoutValue < 0) {
       Duration.Inf
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index a9a6696cb15e460025fee47af0cfaa1fffa9a07b..f5c02224c82a09a5d41c3615bb57750bb11575ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -59,8 +59,8 @@ private[sql] case class JSONRelation(
       JsonRDD.inferSchema(
         baseRDD,
         samplingRatio,
-        sqlContext.columnNameOfCorruptRecord)))
+        sqlContext.conf.columnNameOfCorruptRecord)))
 
   override def buildScan() =
-    JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
+    JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 2835dc3408b962293943bd31c1f65fd9c58a6cd1..cde5160149e9c8aa20235bf65f938f303c8b6dba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -65,7 +65,7 @@ private[sql] case class ParquetRelation(
     ParquetTypesConverter.readSchemaFromFile(
       new Path(path.split(",").head),
       conf,
-      sqlContext.isParquetBinaryAsString)
+      sqlContext.conf.isParquetBinaryAsString)
 
   lazy val attributeMap = AttributeMap(output.map(o => o -> o))
 
@@ -80,7 +80,7 @@ private[sql] case class ParquetRelation(
   }
 
   // TODO: Use data from the footers.
-  override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes)
+  override lazy val statistics = Statistics(sizeInBytes = sqlContext.conf.defaultSizeInBytes)
 }
 
 private[sql] object ParquetRelation {
@@ -163,7 +163,8 @@ private[sql] object ParquetRelation {
                   sqlContext: SQLContext): ParquetRelation = {
     val path = checkPath(pathString, allowExisting, conf)
     conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
-      sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
+      sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
+      .name())
     ParquetRelation.enableLogForwarding()
     ParquetTypesConverter.writeMetaData(attributes, path, conf)
     new ParquetRelation(path.toString, Some(conf), sqlContext) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index b4d48902fd2c6960340fc919e1c80d96deb10634..02ce1b3e6d81193b09524dea5b8d97a4e2f6da5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -54,7 +54,7 @@ trait ParquetTest {
     try f finally {
       keys.zip(currentValues).foreach {
         case (key, Some(value)) => setConf(key, value)
-        case (key, None) => unsetConf(key)
+        case (key, None) => conf.unsetConf(key)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 2e0c6c51c00e5c0ab66002e2265b1013eaed06e6..55a2728a85cc76790ae1bb8879ebdef8c168e5f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -137,7 +137,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
     ParquetTypesConverter.readSchemaFromFile(
       partitions.head.files.head.getPath,
       Some(sparkContext.hadoopConfiguration),
-      sqlContext.isParquetBinaryAsString))
+      sqlContext.conf.isParquetBinaryAsString))
 
   val dataIncludesKey =
     partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
@@ -198,7 +198,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
     predicates
       .reduceOption(And)
       .flatMap(ParquetFilters.createFilter)
-      .filter(_ => sqlContext.parquetFilterPushDown)
+      .filter(_ => sqlContext.conf.parquetFilterPushDown)
       .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
 
     def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2a7be23e37c74aca8db3351b5117105aec01bf82..7f5564baa00f431d1c4fb4647515c8b29da277cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -99,7 +99,7 @@ abstract class BaseRelation {
    * large to broadcast.  This method will be called multiple times during query planning
    * and thus should not perform expensive operations for each invocation.
    */
-  def sizeInBytes = sqlContext.defaultSizeInBytes
+  def sizeInBytes = sqlContext.conf.defaultSizeInBytes
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 6bb81c76ed8bd030ba849e6dca97848df6cd0e9d..8c80be106f3cb0761dd97119ff8a46c54102c9f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -29,6 +29,7 @@ object TestSQLContext
       new SparkConf().set("spark.sql.testkey", "true"))) {
 
   /** Fewer partitions to speed up testing. */
-  override private[spark] def numShufflePartitions: Int =
-    getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  private[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index c7e136388fce87eda005bceec36628b5d9a4a326..e5ab16f9dd661cb9b82b008cd2e393459b89cca5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -387,7 +387,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
   test("broadcasted left semi join operator selection") {
     clearCache()
     sql("CACHE TABLE testData")
-    val tmp = autoBroadcastJoinThreshold
+    val tmp = conf.autoBroadcastJoinThreshold
 
     sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000")
     Seq(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 60701f0e154f8eb551941be036f5b8876e266876..bf73d0c7074a542b6fa94d715a4cccde385985ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -37,7 +37,7 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
   }
 
   test("programmatic ways of basic setting and getting") {
-    clear()
+    conf.clear()
     assert(getAllConfs.size === 0)
 
     setConf(testKey, testVal)
@@ -51,11 +51,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
     assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
     assert(TestSQLContext.getAllConfs.contains(testKey))
 
-    clear()
+    conf.clear()
   }
 
   test("parse SQL set commands") {
-    clear()
+    conf.clear()
     sql(s"set $testKey=$testVal")
     assert(getConf(testKey, testVal + "_") == testVal)
     assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
@@ -73,11 +73,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
     sql(s"set $key=")
     assert(getConf(key, "0") == "")
 
-    clear()
+    conf.clear()
   }
 
   test("deprecated property") {
-    clear()
+    conf.clear()
     sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
     assert(getConf(SQLConf.SHUFFLE_PARTITIONS) == "10")
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d9de5686dce480a023304308188bdcdd29ef9607..bc72daf0880a1fbefbe7cff6251d1797afa34dd7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -79,7 +79,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("aggregation with codegen") {
-    val originalValue = codegenEnabled
+    val originalValue = conf.codegenEnabled
     setConf(SQLConf.CODEGEN_ENABLED, "true")
     sql("SELECT key FROM testData GROUP BY key").collect()
     setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
@@ -245,14 +245,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("sorting") {
-    val before = externalSortEnabled
+    val before = conf.externalSortEnabled
     setConf(SQLConf.EXTERNAL_SORT, "false")
     sortTest()
     setConf(SQLConf.EXTERNAL_SORT, before.toString)
   }
 
   test("external sorting") {
-    val before = externalSortEnabled
+    val before = conf.externalSortEnabled
     setConf(SQLConf.EXTERNAL_SORT, "true")
     sortTest()
     setConf(SQLConf.EXTERNAL_SORT, before.toString)
@@ -600,7 +600,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SET commands semantics using sql()") {
-    clear()
+    conf.clear()
     val testKey = "test.key.0"
     val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
@@ -632,7 +632,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       sql(s"SET $nonexistentKey"),
       Seq(Seq(s"$nonexistentKey=<undefined>"))
     )
-    clear()
+    conf.clear()
   }
 
   test("apply schema") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index fc95dccc74e27e19a3d7520165d99c3c99aff877..d94729ba92360de878f94e2461184e1b709b1bd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -39,7 +39,8 @@ class InMemoryColumnarQuerySuite extends QueryTest {
     sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).registerTempTable("sizeTst")
     cacheTable("sizeTst")
     assert(
-      table("sizeTst").queryExecution.logical.statistics.sizeInBytes > autoBroadcastJoinThreshold)
+      table("sizeTst").queryExecution.logical.statistics.sizeInBytes >
+        conf.autoBroadcastJoinThreshold)
   }
 
   test("projection") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 1915c25392f1e97dc743071cc9e9feecc588fed9..592cafbbdc203c4c91078ff1fbc75145c2060358 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.test.TestSQLContext._
 
 class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
-  val originalColumnBatchSize = columnBatchSize
-  val originalInMemoryPartitionPruning = inMemoryPartitionPruning
+  val originalColumnBatchSize = conf.columnBatchSize
+  val originalInMemoryPartitionPruning = conf.inMemoryPartitionPruning
 
   override protected def beforeAll(): Unit = {
     // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a5af71acfc79afd7abf331babf94c370b353cd1b..c5b6fce5fd29757518fdb0cd8e410d7c20c66d54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -60,7 +60,7 @@ class PlannerSuite extends FunSuite {
   }
 
   test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
-    val origThreshold = autoBroadcastJoinThreshold
+    val origThreshold = conf.autoBroadcastJoinThreshold
     setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
 
     // Using a threshold that is definitely larger than the small testing table (b) below
@@ -78,7 +78,7 @@ class PlannerSuite extends FunSuite {
   }
 
   test("InMemoryRelation statistics propagation") {
-    val origThreshold = autoBroadcastJoinThreshold
+    val origThreshold = conf.autoBroadcastJoinThreshold
     setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
 
     testData.limit(3).registerTempTable("tiny")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 8dce3372a8db3c743b5a68b9cd5c2e6c7e2fbe00..b09f1ac49553bb1d76225f456d2a9400ac76f2b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -713,7 +713,7 @@ class JsonSuite extends QueryTest {
 
   test("Corrupt records") {
     // Test if we can query corrupt records.
-    val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord
+    val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
     TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
 
     val jsonSchemaRDD = jsonRDD(corruptRecords)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 10a01474e95be2e24b5de16d1c3490fada362303..6ac67fcafe16bcb13b234e8c620deb0fe3415eb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -213,7 +213,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
     def checkCompressionCodec(codec: CompressionCodecName): Unit = {
       withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
         withParquetFile(data) { path =>
-          assertResult(parquetCompressionCodec.toUpperCase) {
+          assertResult(conf.parquetCompressionCodec.toUpperCase) {
             compressionCodecFor(path)
           }
         }
@@ -221,7 +221,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
     }
 
     // Checks default compression codec
-    checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec))
+    checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
 
     checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
     checkCompressionCodec(CompressionCodecName.GZIP)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index a5fe2e8da28400b7865b2fb0dd5c62e3ae6580d9..0a92336a3cb3926c22151c27874ecace19c664b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -88,7 +88,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   TestData // Load test data tables.
 
   private var testRDD: SchemaRDD = null
-  private val originalParquetFilterPushdownEnabled = TestSQLContext.parquetFilterPushDown
+  private val originalParquetFilterPushdownEnabled = TestSQLContext.conf.parquetFilterPushDown
 
   override def beforeAll() {
     ParquetTestData.writeFile()
@@ -144,7 +144,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   }
 
   ignore("Treat binary as string") {
-    val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
+    val oldIsParquetBinaryAsString = TestSQLContext.conf.isParquetBinaryAsString
 
     // Create the test file.
     val file = getTempFilePath("parquet")
@@ -174,7 +174,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   }
 
   test("Compression options for writing to a Parquetfile") {
-    val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
+    val defaultParquetCompressionCodec = TestSQLContext.conf.parquetCompressionCodec
     import scala.collection.JavaConversions._
 
     val file = getTempFilePath("parquet")
@@ -186,7 +186,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
@@ -202,7 +202,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
@@ -234,7 +234,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
@@ -250,7 +250,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 23283fd3fe6b1702cc3364b2c70b5aa8afb404dd..0d934620aca09a469aa4a8a25cb91f4c9358ae15 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -36,8 +36,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
 
   private val originalTimeZone = TimeZone.getDefault
   private val originalLocale = Locale.getDefault
-  private val originalColumnBatchSize = TestHive.columnBatchSize
-  private val originalInMemoryPartitionPruning = TestHive.inMemoryPartitionPruning
+  private val originalColumnBatchSize = TestHive.conf.columnBatchSize
+  private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
 
   def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 09ff4cc5ab437dc8d3b2139818a3882dbcd35000..9aeebd7e543667445f1f048d6d6bd7bd2997fe55 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -71,8 +71,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
 class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   self =>
 
-  // Change the default SQL dialect to HiveQL
-  override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  private[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  }
 
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
@@ -87,12 +88,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
   override def sql(sqlText: String): SchemaRDD = {
     // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
-    if (dialect == "sql") {
+    if (conf.dialect == "sql") {
       super.sql(sqlText)
-    } else if (dialect == "hiveql") {
+    } else if (conf.dialect == "hiveql") {
       new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText)))
     }  else {
-      sys.error(s"Unsupported SQL dialect: $dialect.  Try 'sql' or 'hiveql'")
+      sys.error(s"Unsupported SQL dialect: ${conf.dialect}.  Try 'sql' or 'hiveql'")
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index daeabb6c8bab8325875f8cabd5cf1b1636456a49..785a6a14f49f4ce88164250b26bc3d8b7ee12942 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -515,7 +515,7 @@ private[hive] case class MetastoreRelation
         // if the size is still less than zero, we use default size
         Option(totalSize).map(_.toLong).filter(_ > 0)
           .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
-          .getOrElse(sqlContext.defaultSizeInBytes)))
+          .getOrElse(sqlContext.conf.defaultSizeInBytes)))
     }
   )
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 31c7ce96398ebe6f191667a3427edcd30753051d..52e1f0d94fbd45fd4980e71b0f7c46938b39e0a7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -102,8 +102,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     new this.QueryExecution { val logical = plan }
 
   /** Fewer partitions to speed up testing. */
-  override private[spark] def numShufflePartitions: Int =
-    getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  private[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  }
 
   /**
    * Returns the value of specified environmental variable as a [[java.io.File]] after checking
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
index 1817c7832490e0944d169b4aa1a3b1bcd46825b5..038f63f6c744116ff0e16cdae85f98dd8a88bb86 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
@@ -31,12 +31,12 @@ class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext)
 
   override def sql(sqlText: String): JavaSchemaRDD = {
     // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
-    if (sqlContext.dialect == "sql") {
+    if (sqlContext.conf.dialect == "sql") {
       super.sql(sqlText)
-    } else if (sqlContext.dialect == "hiveql") {
+    } else if (sqlContext.conf.dialect == "hiveql") {
       new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText))
     }  else {
-      sys.error(s"Unsupported SQL dialect: ${sqlContext.dialect}.  Try 'sql' or 'hiveql'")
+      sys.error(s"Unsupported SQL dialect: ${sqlContext.conf.dialect}.  Try 'sql' or 'hiveql'")
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a758f921e04172b12623216abb9ec1e8963be87e..0b4e76c9d3d2fc47050ec9b5dcf07873bbc9de31 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -81,7 +81,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
     // TODO: How does it works? needs to add it back for other hive version.
     if (HiveShim.version =="0.12.0") {
-      assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
+      assert(queryTotalSize("analyzeTable") === conf.defaultSizeInBytes)
     }
     sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
 
@@ -110,7 +110,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
         |SELECT * FROM src
       """.stripMargin).collect()
 
-    assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)
+    assert(queryTotalSize("analyzeTable_part") === conf.defaultSizeInBytes)
 
     sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
 
@@ -151,8 +151,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
       val sizes = rdd.queryExecution.analyzed.collect {
         case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
       }
-      assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold
-        && sizes(1) <= autoBroadcastJoinThreshold,
+      assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold
+        && sizes(1) <= conf.autoBroadcastJoinThreshold,
         s"query should contain two relations, each of which has size smaller than autoConvertSize")
 
       // Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -163,8 +163,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
       checkAnswer(rdd, expectedAnswer) // check correctness of output
 
-      TestHive.settings.synchronized {
-        val tmp = autoBroadcastJoinThreshold
+      TestHive.conf.settings.synchronized {
+        val tmp = conf.autoBroadcastJoinThreshold
 
         sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
         rdd = sql(query)
@@ -207,8 +207,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
         .isAssignableFrom(r.getClass) =>
         r.statistics.sizeInBytes
     }
-    assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold
-      && sizes(0) <= autoBroadcastJoinThreshold,
+    assert(sizes.size === 2 && sizes(1) <= conf.autoBroadcastJoinThreshold
+      && sizes(0) <= conf.autoBroadcastJoinThreshold,
       s"query should contain two relations, each of which has size smaller than autoConvertSize")
 
     // Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -221,8 +221,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(rdd, answer) // check correctness of output
 
-    TestHive.settings.synchronized {
-      val tmp = autoBroadcastJoinThreshold
+    TestHive.conf.settings.synchronized {
+      val tmp = conf.autoBroadcastJoinThreshold
 
       sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
       rdd = sql(leftSemiJoinQuery)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4decd1548534bb529d28b07e8fed80cb96d851ca..c14f0d24e0dc30a378249788d2c7b55c86dbf493 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -847,7 +847,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
         case Row(key: String, value: String) => key -> value
         case Row(KV(key, value)) => key -> value
       }.toSet
-    clear()
+    conf.clear()
 
     // "SET" itself returns all config variables currently specified in SQLConf.
     // TODO: Should we be listing the default here always? probably...
@@ -879,7 +879,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
       collectResults(sql(s"SET $nonexistentKey"))
     }
 
-    clear()
+    conf.clear()
   }
 
   createQueryTest("select from thrift based table",