diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 3d468d804622cde42f522f3b795a15e482dc7402..bd4e99492b395c2cb0116d44ba75684652eee2e8 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,11 +17,8 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import scala.collection.JavaConversions._
-
 import org.apache.commons.logging.LogFactory
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
 import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
 
@@ -51,24 +48,12 @@ object HiveThriftServer2 extends Logging {
 
   def main(args: Array[String]) {
     val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
-
     if (!optionsProcessor.process(args)) {
       System.exit(-1)
     }
 
-    val ss = new SessionState(new HiveConf(classOf[SessionState]))
-
-    // Set all properties specified via command line.
-    val hiveConf: HiveConf = ss.getConf
-    hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
-      logDebug(s"HiveConf var: $k=$v")
-    }
-
-    SessionState.start(ss)
-
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
-    SessionState.start(ss)
 
     Runtime.getRuntime.addShutdownHook(
       new Thread() {
@@ -80,7 +65,7 @@ object HiveThriftServer2 extends Logging {
 
     try {
       val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
-      server.init(hiveConf)
+      server.init(SparkSQLEnv.hiveContext.hiveconf)
       server.start()
       logInfo("HiveThriftServer2 started")
     } catch {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 2136a2ea63543f825c5654e07ae1982cd3f0e6ee..50425863518c3883ae4b008d5f39df951de59f72 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -17,12 +17,10 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import org.apache.hadoop.hive.ql.session.SessionState
-
-import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
-import org.apache.spark.Logging
+import org.apache.spark.scheduler.StatsReportListener
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+import scala.collection.JavaConversions._
 
 /** A singleton object for the master program. The slaves should not access this. */
 private[hive] object SparkSQLEnv extends Logging {
@@ -37,14 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
         .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
 
       sparkContext.addSparkListener(new StatsReportListener())
+      hiveContext = new HiveContext(sparkContext)
 
-      hiveContext = new HiveContext(sparkContext) {
-        @transient override lazy val sessionState = {
-          val state = SessionState.get()
-          setConf(state.getConf.getAllProperties)
-          state
+      if (log.isDebugEnabled) {
+        hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
+          logDebug(s"HiveConf var: $k=$v")
         }
-        @transient override lazy val hiveconf = sessionState.getConf
       }
     }
   }
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index e3b4e45a3d68c84b2ce12f1d9c78c3fdea40d23b..c60e8fa5b125914df92155e6990d62bf40b98513 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -150,10 +150,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
       val dataFilePath =
         Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
 
-      val queries = Seq(
-        "CREATE TABLE test(key INT, val STRING)",
-        s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
-        "CACHE TABLE test")
+      val queries =
+        s"""SET spark.sql.shuffle.partitions=3;
+           |CREATE TABLE test(key INT, val STRING);
+           |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
+           |CACHE TABLE test;
+         """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
 
       queries.foreach(statement.execute)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fad4091d48a89cad752664c0af10dfd344ab6a5a..ff8fa44194d98fcee9d61ede9757c2317ceb838f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   }
 
   /**
-   * SQLConf and HiveConf contracts: when the hive session is first initialized, params in
-   * HiveConf will get picked up by the SQLConf.  Additionally, any properties set by
-   * set() or a SET command inside sql() will be set in the SQLConf *as well as*
-   * in the HiveConf.
+   * SQLConf and HiveConf contracts:
+   *
+   * 1. reuse existing started SessionState if any
+   * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+   *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
+   *    set in the SQLConf *as well as* in the HiveConf.
    */
-  @transient lazy val hiveconf = new HiveConf(classOf[SessionState])
-  @transient protected[hive] lazy val sessionState = {
-    val ss = new SessionState(hiveconf)
-    setConf(hiveconf.getAllProperties)  // Have SQLConf pick up the initial set of HiveConf.
-    SessionState.start(ss)
-    ss.err = new PrintStream(outputBuffer, true, "UTF-8")
-    ss.out = new PrintStream(outputBuffer, true, "UTF-8")
-
-    ss
-  }
+  @transient protected[hive] lazy val (hiveconf, sessionState) =
+    Option(SessionState.get())
+      .orElse {
+        val newState = new SessionState(new HiveConf(classOf[SessionState]))
+        // Only starts newly created `SessionState` instance.  Any existing `SessionState` instance
+        // returned by `SessionState.get()` must be the most recently started one.
+        SessionState.start(newState)
+        Some(newState)
+      }
+      .map { state =>
+        setConf(state.getConf.getAllProperties)
+        if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
+        if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
+        (state.getConf, state)
+      }
+      .get
 
   override def setConf(key: String, value: String): Unit = {
     super.setConf(key, value)
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
       val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
 
+      // Makes sure the session represented by the `sessionState` field is activated. This implies
+      // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
+      // session isolation under multi-user scenarios (i.e. HiveThriftServer2).
+      // TODO Fix session isolation
+      if (SessionState.get() != sessionState) {
+        SessionState.start(sessionState)
+      }
+
       proc match {
         case driver: Driver =>
           val results = HiveShim.createDriverResultsArray