diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9c49e84bf9680362c0f2e3a6a1dcbcb2ef9f711d..297d0d644a423ea51b2c7a5f2c9aca1524188689 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -63,8 +63,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
 
-  // Note that this is a lazy val so we can override the default value in subclasses.
-  protected[sql] lazy val conf: SQLConf = new SQLConf
+  /**
+   * @return Spark SQL configuration
+   */
+  protected[sql] def conf = tlSession.get().conf
 
   /**
    * Set Spark SQL configuration properties.
@@ -103,9 +105,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
 
+  // TODO how to handle the temp table per user session?
   @transient
   protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
 
+  // TODO how to handle the temp function per user session?
   @transient
   protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry(true)
 
@@ -138,6 +142,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
 
+  @transient
+  protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
+    override def initialValue = defaultSession
+  }
+
+  @transient
+  protected[sql] val defaultSession = createSession()
+
   sparkContext.getConf.getAll.foreach {
     case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
     case _ =>
@@ -194,6 +206,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * }}}
    *
    * @group basic
+   * TODO move to SQLSession?
    */
   @transient
   val udf: UDFRegistration = new UDFRegistration(this)
@@ -1059,6 +1072,32 @@ class SQLContext(@transient val sparkContext: SparkContext)
     )
   }
 
+
+  protected[sql] def openSession(): SQLSession = {
+    detachSession()
+    val session = createSession()
+    tlSession.set(session)
+
+    session
+  }
+
+  protected[sql] def currentSession(): SQLSession = {
+    tlSession.get()
+  }
+
+  protected[sql] def createSession(): SQLSession = {
+    new this.SQLSession()
+  }
+
+  protected[sql] def detachSession(): Unit = {
+    tlSession.remove()
+  }
+
+  protected[sql] class SQLSession {
+    // Note that this is a lazy val so we can override the default value in subclasses.
+    protected[sql] lazy val conf: SQLConf = new SQLConf
+  }
+
   /**
    * :: DeveloperApi ::
    * The primary workflow for executing relational queries using Spark.  Designed to allow easy
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 4e1ec38bd015894bc1119ee6f5ba3259318c1c77..356a6100d2cf5a8f986b79fe3e5f9720525a3ca1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -24,16 +24,22 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 /** A SQLContext that can be used for local testing. */
-object TestSQLContext
+class LocalSQLContext
   extends SQLContext(
     new SparkContext(
       "local[2]",
       "TestSQLContext",
       new SparkConf().set("spark.sql.testkey", "true"))) {
 
-  /** Fewer partitions to speed up testing. */
-  protected[sql] override lazy val conf: SQLConf = new SQLConf {
-    override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  override protected[sql] def createSession(): SQLSession = {
+    new this.SQLSession()
+  }
+
+  protected[sql] class SQLSession extends super.SQLSession {
+    protected[sql] override lazy val conf: SQLConf = new SQLConf {
+      /** Fewer partitions to speed up testing. */
+      override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+    }
   }
 
   /**
@@ -45,3 +51,6 @@ object TestSQLContext
   }
 
 }
+
+object TestSQLContext extends LocalSQLContext
+
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
deleted file mode 100644
index 89e9ede7261c914ba4b35d04010ae4a1caa8cf84..0000000000000000000000000000000000000000
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.thriftserver
-
-import java.util.concurrent.Executors
-
-import org.apache.commons.logging.Log
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.cli.session.SessionManager
-
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-import org.apache.hive.service.cli.SessionHandle
-
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
-  extends SessionManager
-  with ReflectedCompositeService {
-
-  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
-
-  override def init(hiveConf: HiveConf) {
-    setSuperField(this, "hiveConf", hiveConf)
-
-    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
-    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
-    getAncestorField[Log](this, 3, "LOG").info(
-      s"HiveServer2: Async execution pool size $backgroundPoolSize")
-
-    setSuperField(this, "operationManager", sparkSqlOperationManager)
-    addService(sparkSqlOperationManager)
-
-    initCompositeService(hiveConf)
-  }
-
-  override def closeSession(sessionHandle: SessionHandle) {
-    super.closeSession(sessionHandle)
-    sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-  }
-}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index d783d487b5c60137e32b141c84e6696f9e5201e6..aff96e21a5373e1569f11fbbe057dd8109062581 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -195,6 +195,146 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       }
     }
   }
+
+  test("test multiple session") {
+    import org.apache.spark.sql.SQLConf
+    var defaultV1: String = null
+    var defaultV2: String = null
+
+    withMultipleConnectionJdbcStatement(
+      // create table
+      { statement =>
+
+        val queries = Seq(
+            "DROP TABLE IF EXISTS test_map",
+            "CREATE TABLE test_map(key INT, value STRING)",
+            s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
+            "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC")
+
+        queries.foreach(statement.execute)
+
+        val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+        val buf1 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs1.next()) {
+          buf1 += rs1.getInt(1)
+        }
+        rs1.close()
+
+        val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+        val buf2 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs2.next()) {
+          buf2 += rs2.getInt(1)
+        }
+        rs2.close()
+
+        assert(buf1 === buf2)
+      },
+
+      // first session, we get the default value of the session status
+      { statement =>
+
+        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+        rs1.next()
+        defaultV1 = rs1.getString(1)
+        assert(defaultV1 != "200")
+        rs1.close()
+
+        val rs2 = statement.executeQuery("SET hive.cli.print.header")
+        rs2.next()
+
+        defaultV2 = rs2.getString(1)
+        assert(defaultV1 != "true")
+        rs2.close()
+      },
+
+      // second session, we update the session status
+      { statement =>
+
+        val queries = Seq(
+            s"SET ${SQLConf.SHUFFLE_PARTITIONS}=291",
+            "SET hive.cli.print.header=true"
+            )
+
+        queries.map(statement.execute)
+        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+        rs1.next()
+        assert("spark.sql.shuffle.partitions=291" === rs1.getString(1))
+        rs1.close()
+
+        val rs2 = statement.executeQuery("SET hive.cli.print.header")
+        rs2.next()
+        assert("hive.cli.print.header=true" === rs2.getString(1))
+        rs2.close()
+      },
+
+      // third session, we get the latest session status, supposed to be the
+      // default value
+      { statement =>
+
+        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+        rs1.next()
+        assert(defaultV1 === rs1.getString(1))
+        rs1.close()
+
+        val rs2 = statement.executeQuery("SET hive.cli.print.header")
+        rs2.next()
+        assert(defaultV2 === rs2.getString(1))
+        rs2.close()
+      },
+
+      // accessing the cached data in another session
+      { statement =>
+
+        val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+        val buf1 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs1.next()) {
+          buf1 += rs1.getInt(1)
+        }
+        rs1.close()
+
+        val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+        val buf2 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs2.next()) {
+          buf2 += rs2.getInt(1)
+        }
+        rs2.close()
+
+        assert(buf1 === buf2)
+        statement.executeQuery("UNCACHE TABLE test_table")
+
+        // TODO need to figure out how to determine if the data loaded from cache
+        val rs3 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+        val buf3 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs3.next()) {
+          buf3 += rs3.getInt(1)
+        }
+        rs3.close()
+
+        assert(buf1 === buf3)
+      },
+
+      // accessing the uncached table
+      { statement =>
+
+        // TODO need to figure out how to determine if the data loaded from cache
+        val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
+        val buf1 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs1.next()) {
+          buf1 += rs1.getInt(1)
+        }
+        rs1.close()
+
+        val rs2 = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
+        val buf2 = new collection.mutable.ArrayBuffer[Int]()
+        while (rs2.next()) {
+          buf2 += rs2.getInt(1)
+        }
+        rs2.close()
+
+        assert(buf1 === buf2)
+      }
+    )
+  }
 }
 
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
@@ -245,15 +385,22 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
     s"jdbc:hive2://localhost:$serverPort/"
   }
 
-  protected def withJdbcStatement(f: Statement => Unit): Unit = {
-    val connection = DriverManager.getConnection(jdbcUri, user, "")
-    val statement = connection.createStatement()
-
-    try f(statement) finally {
-      statement.close()
-      connection.close()
+  def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) {
+    val user = System.getProperty("user.name")
+    val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
+    val statements = connections.map(_.createStatement())
+
+    try {
+      statements.zip(fs).map { case (s, f) => f(s) }
+    } finally {
+      statements.map(_.close())
+      connections.map(_.close())
     }
   }
+
+  def withJdbcStatement(f: Statement => Unit) {
+    withMultipleConnectionJdbcStatement(f)
+  }
 }
 
 abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll with Logging {
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 13116b40bb2595eede4409d7aea7f6077c66689c..95a6e86d0546d4daa04b7b39a7cfe2a4a4b14bcb 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -18,8 +18,15 @@
 package org.apache.spark.sql.hive.thriftserver
 
 import java.sql.{Date, Timestamp}
+import java.util.concurrent.Executors
 import java.util.{ArrayList => JArrayList, Map => JMap}
 
+import org.apache.commons.logging.Log
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.thrift.TProtocolVersion
+import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 
@@ -29,7 +36,7 @@ import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
-import org.apache.hive.service.cli.session.HiveSession
+import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{DataFrame, SQLConf, Row => SparkRow}
@@ -220,3 +227,42 @@ private[hive] class SparkExecuteStatementOperation(
     setState(OperationState.FINISHED)
   }
 }
+
+private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
+  extends SessionManager
+  with ReflectedCompositeService {
+
+  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
+  override def init(hiveConf: HiveConf) {
+    setSuperField(this, "hiveConf", hiveConf)
+
+    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
+    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
+    getAncestorField[Log](this, 3, "LOG").info(
+      s"HiveServer2: Async execution pool size $backgroundPoolSize")
+
+    setSuperField(this, "operationManager", sparkSqlOperationManager)
+    addService(sparkSqlOperationManager)
+
+    initCompositeService(hiveConf)
+  }
+
+  override def openSession(
+      username: String,
+      passwd: String,
+      sessionConf: java.util.Map[String, String],
+      withImpersonation: Boolean,
+      delegationToken: String): SessionHandle = {
+    hiveContext.openSession()
+
+    super.openSession(username, passwd, sessionConf, withImpersonation, delegationToken)
+  }
+
+  override def closeSession(sessionHandle: SessionHandle) {
+    super.closeSession(sessionHandle)
+    sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+
+    hiveContext.detachSession()
+  }
+}
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 9b8faeff94eab59d9d2470c12a18d5b03b990039..178eb1af7cdcd75b7f20aa6649c31682cb43e571 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -18,8 +18,15 @@
 package org.apache.spark.sql.hive.thriftserver
 
 import java.sql.{Date, Timestamp}
+import java.util.concurrent.Executors
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
+import org.apache.commons.logging.Log
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.thrift.TProtocolVersion
+import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 
@@ -27,7 +34,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
-import org.apache.hive.service.cli.session.HiveSession
+import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
@@ -191,3 +198,43 @@ private[hive] class SparkExecuteStatementOperation(
     setState(OperationState.FINISHED)
   }
 }
+
+private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
+  extends SessionManager
+  with ReflectedCompositeService {
+
+  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
+  override def init(hiveConf: HiveConf) {
+    setSuperField(this, "hiveConf", hiveConf)
+
+    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
+    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
+    getAncestorField[Log](this, 3, "LOG").info(
+      s"HiveServer2: Async execution pool size $backgroundPoolSize")
+
+    setSuperField(this, "operationManager", sparkSqlOperationManager)
+    addService(sparkSqlOperationManager)
+
+    initCompositeService(hiveConf)
+  }
+
+  override def openSession(
+      protocol: TProtocolVersion,
+      username: String,
+      passwd: String,
+      sessionConf: java.util.Map[String, String],
+      withImpersonation: Boolean,
+      delegationToken: String): SessionHandle = {
+    hiveContext.openSession()
+
+    super.openSession(protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+  }
+
+  override def closeSession(sessionHandle: SessionHandle) {
+    super.closeSession(sessionHandle)
+    sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+
+    hiveContext.detachSession()
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c439dfe0a71f88c8ce76f038eb0539675e890bbc..a5c435fdfa7785e2eae000e2818e8de4356260e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -49,10 +49,6 @@ import org.apache.spark.sql.types._
 class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   self =>
 
-  protected[sql] override lazy val conf: SQLConf = new SQLConf {
-    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
-  }
-
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
@@ -214,33 +210,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     }
   }
 
-  /**
-   * SQLConf and HiveConf contracts:
-   *
-   * 1. reuse existing started SessionState if any
-   * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
-   *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
-   *    set in the SQLConf *as well as* in the HiveConf.
-   */
-  @transient protected[hive] lazy val sessionState: SessionState = {
-    var state = SessionState.get()
-    if (state == null) {
-      state = new SessionState(new HiveConf(classOf[SessionState]))
-      SessionState.start(state)
-    }
-    if (state.out == null) {
-      state.out = new PrintStream(outputBuffer, true, "UTF-8")
-    }
-    if (state.err == null) {
-      state.err = new PrintStream(outputBuffer, true, "UTF-8")
-    }
-    state
-  }
+  protected[hive] def sessionState = tlSession.get().asInstanceOf[this.SQLSession].sessionState
 
-  @transient protected[hive] lazy val hiveconf: HiveConf = {
-    setConf(sessionState.getConf.getAllProperties)
-    sessionState.getConf
-  }
+  protected[hive] def hiveconf = tlSession.get().asInstanceOf[this.SQLSession].hiveconf
 
   override def setConf(key: String, value: String): Unit = {
     super.setConf(key, value)
@@ -272,6 +244,44 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         Nil
     }
 
+  override protected[sql] def createSession(): SQLSession = {
+    new this.SQLSession()
+  }
+
+  protected[hive] class SQLSession extends super.SQLSession {
+    protected[sql] override lazy val conf: SQLConf = new SQLConf {
+      override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+    }
+
+    protected[hive] lazy val hiveconf: HiveConf = {
+      setConf(sessionState.getConf.getAllProperties)
+      sessionState.getConf
+    }
+
+    /**
+     * SQLConf and HiveConf contracts:
+     *
+     * 1. reuse existing started SessionState if any
+     * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+     *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
+     *    set in the SQLConf *as well as* in the HiveConf.
+     */
+    protected[hive] lazy val sessionState: SessionState = {
+      var state = SessionState.get()
+      if (state == null) {
+        state = new SessionState(new HiveConf(classOf[SessionState]))
+        SessionState.start(state)
+      }
+      if (state.out == null) {
+        state.out = new PrintStream(outputBuffer, true, "UTF-8")
+      }
+      if (state.err == null) {
+        state.err = new PrintStream(outputBuffer, true, "UTF-8")
+      }
+      state
+    }
+  }
+
   /**
    * Runs the specified SQL query using Hive.
    */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a2d99f1f4b28d7465ed14a3e07ec73bda07846d8..4859991e2351a9fbeeedba2b27c8c2c22a655059 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -102,10 +102,16 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   override def executePlan(plan: LogicalPlan): this.QueryExecution =
     new this.QueryExecution(plan)
 
-  /** Fewer partitions to speed up testing. */
-  protected[sql] override lazy val conf: SQLConf = new SQLConf {
-    override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
-    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  override protected[sql] def createSession(): SQLSession = {
+    new this.SQLSession()
+  }
+
+  protected[hive] class SQLSession extends super.SQLSession {
+    /** Fewer partitions to speed up testing. */
+    protected[sql] override lazy val conf: SQLConf = new SQLConf {
+      override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+      override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+    }
   }
 
   /**