diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 6e02d6564b0021dbe5bdc23efcad2b63b0ed92d2..e347754055e799ccbe65d012ffa543a2c58b62a5 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -2051,6 +2051,20 @@ options.
 
 # Migration Guide
 
+## Upgrading From Spark SQL 1.5 to 1.6
+
+ - From Spark 1.6, by default the Thrift server runs in multi-session mode.  Which means each JDBC/ODBC
+   connection owns a copy of their own SQL configuration and temporary function registry.  Cached
+   tables are still shared though.  If you prefer to run the Thrift server in the old single-session
+   mode, please set option `spark.sql.hive.thriftServer.singleSession` to `true`.  You may either add
+   this option to `spark-defaults.conf`, or pass it to `start-thriftserver.sh` via `--conf`:
+
+   {% highlight bash %}
+   ./sbin/start-thriftserver.sh \
+     --conf spark.sql.hive.thriftServer.singleSession=true \
+     ...
+   {% endhighlight %}
+
 ## Upgrading From Spark SQL 1.4 to 1.5
 
  - Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 33aaead3fbf96dc4fd4124d7370cd4fb4d099111..af4fcdf021bd4c49b886f33d39204e33b4ef6b63 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -66,7 +66,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
     val session = super.getSession(sessionHandle)
     HiveThriftServer2.listener.onSessionCreated(
       session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
-    val ctx = hiveContext.newSession()
+    val ctx = if (hiveContext.hiveThriftServerSingleSession) {
+      hiveContext
+    } else {
+      hiveContext.newSession()
+    }
     ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
     sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
     sessionHandle
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index eb1895f263d70b1efe8e10bfada03c496ac2505b..1dd898aa38350764ceb9957d2dee37682213bd44 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -41,7 +41,6 @@ import org.apache.thrift.transport.TSocket
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SparkFunSuite}
@@ -510,6 +509,53 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 }
 
+class SingleSessionSuite extends HiveThriftJdbcTest {
+  override def mode: ServerMode.Value = ServerMode.binary
+
+  override protected def extraConf: Seq[String] =
+    "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
+
+  test("test single session") {
+    withMultipleConnectionJdbcStatement(
+      { statement =>
+        val jarPath = "../hive/src/test/resources/TestUDTF.jar"
+        val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
+
+        // Configurations and temporary functions added in this session should be visible to all
+        // the other sessions.
+        Seq(
+          "SET foo=bar",
+          s"ADD JAR $jarURL",
+          s"""CREATE TEMPORARY FUNCTION udtf_count2
+              |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+           """.stripMargin
+        ).foreach(statement.execute)
+      },
+
+      { statement =>
+        val rs1 = statement.executeQuery("SET foo")
+
+        assert(rs1.next())
+        assert(rs1.getString(1) === "foo")
+        assert(rs1.getString(2) === "bar")
+
+        val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
+
+        assert(rs2.next())
+        assert(rs2.getString(1) === "Function: udtf_count2")
+
+        assert(rs2.next())
+        assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
+          rs2.getString(1)
+        }
+
+        assert(rs2.next())
+        assert(rs2.getString(1) === "Usage: To be added.")
+      }
+    )
+  }
+}
+
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
   override def mode: ServerMode.Value = ServerMode.http
 
@@ -600,6 +646,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
   private var logTailingProcess: Process = _
   private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
 
+  protected def extraConf: Seq[String] = Nil
+
   protected def serverStartCommand(port: Int) = {
     val portConf = if (mode == ServerMode.binary) {
       ConfVars.HIVE_SERVER2_THRIFT_PORT
@@ -635,6 +683,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
        |  --driver-class-path $driverClassPath
        |  --driver-java-options -Dlog4j.debug
        |  --conf spark.ui.enabled=false
+       |  ${extraConf.mkString("\n")}
      """.stripMargin.split("\\s+").toSeq
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2004f24ad26c667d1ce9b18e2c69c62d4121a189..c0bb5af7d5c85365d48a8db194a9be0d30be3289 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -190,6 +190,9 @@ class HiveContext private[hive](
    */
   protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
 
+  protected[hive] def hiveThriftServerSingleSession: Boolean =
+    sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
+
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()