From 7b1407c7b95c43299a30e891748824c4bc47e43b Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Tue, 17 Nov 2015 11:17:52 -0800
Subject: [PATCH] [SPARK-11089][SQL] Adds option for disabling multi-session in
 Thrift server

This PR adds a new option `spark.sql.hive.thriftServer.singleSession` for disabling multi-session support in the Thrift server.

Note that this option is added as a Spark configuration (retrieved from `SparkConf`) rather than Spark SQL configuration (retrieved from `SQLConf`). This is because all SQL configurations are session-ized. Since multi-session support is by default on, no JDBC connection can modify global configurations like the newly added one.

Author: Cheng Lian <lian@databricks.com>

Closes #9740 from liancheng/spark-11089.single-session-option.
---
 docs/sql-programming-guide.md                 | 14 +++++
 .../thriftserver/SparkSQLSessionManager.scala |  6 ++-
 .../HiveThriftServer2Suites.scala             | 51 ++++++++++++++++++-
 .../apache/spark/sql/hive/HiveContext.scala   |  3 ++
 4 files changed, 72 insertions(+), 2 deletions(-)

diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 6e02d6564b..e347754055 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -2051,6 +2051,20 @@ options.
 
 # Migration Guide
 
+## Upgrading From Spark SQL 1.5 to 1.6
+
+ - From Spark 1.6, by default the Thrift server runs in multi-session mode.  Which means each JDBC/ODBC
+   connection owns a copy of their own SQL configuration and temporary function registry.  Cached
+   tables are still shared though.  If you prefer to run the Thrift server in the old single-session
+   mode, please set option `spark.sql.hive.thriftServer.singleSession` to `true`.  You may either add
+   this option to `spark-defaults.conf`, or pass it to `start-thriftserver.sh` via `--conf`:
+
+   {% highlight bash %}
+   ./sbin/start-thriftserver.sh \
+     --conf spark.sql.hive.thriftServer.singleSession=true \
+     ...
+   {% endhighlight %}
+
 ## Upgrading From Spark SQL 1.4 to 1.5
 
  - Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 33aaead3fb..af4fcdf021 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -66,7 +66,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
     val session = super.getSession(sessionHandle)
     HiveThriftServer2.listener.onSessionCreated(
       session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
-    val ctx = hiveContext.newSession()
+    val ctx = if (hiveContext.hiveThriftServerSingleSession) {
+      hiveContext
+    } else {
+      hiveContext.newSession()
+    }
     ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
     sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
     sessionHandle
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index eb1895f263..1dd898aa38 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -41,7 +41,6 @@ import org.apache.thrift.transport.TSocket
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SparkFunSuite}
@@ -510,6 +509,53 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 }
 
+class SingleSessionSuite extends HiveThriftJdbcTest {
+  override def mode: ServerMode.Value = ServerMode.binary
+
+  override protected def extraConf: Seq[String] =
+    "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
+
+  test("test single session") {
+    withMultipleConnectionJdbcStatement(
+      { statement =>
+        val jarPath = "../hive/src/test/resources/TestUDTF.jar"
+        val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
+
+        // Configurations and temporary functions added in this session should be visible to all
+        // the other sessions.
+        Seq(
+          "SET foo=bar",
+          s"ADD JAR $jarURL",
+          s"""CREATE TEMPORARY FUNCTION udtf_count2
+              |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+           """.stripMargin
+        ).foreach(statement.execute)
+      },
+
+      { statement =>
+        val rs1 = statement.executeQuery("SET foo")
+
+        assert(rs1.next())
+        assert(rs1.getString(1) === "foo")
+        assert(rs1.getString(2) === "bar")
+
+        val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
+
+        assert(rs2.next())
+        assert(rs2.getString(1) === "Function: udtf_count2")
+
+        assert(rs2.next())
+        assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
+          rs2.getString(1)
+        }
+
+        assert(rs2.next())
+        assert(rs2.getString(1) === "Usage: To be added.")
+      }
+    )
+  }
+}
+
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
   override def mode: ServerMode.Value = ServerMode.http
 
@@ -600,6 +646,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
   private var logTailingProcess: Process = _
   private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
 
+  protected def extraConf: Seq[String] = Nil
+
   protected def serverStartCommand(port: Int) = {
     val portConf = if (mode == ServerMode.binary) {
       ConfVars.HIVE_SERVER2_THRIFT_PORT
@@ -635,6 +683,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
        |  --driver-class-path $driverClassPath
        |  --driver-java-options -Dlog4j.debug
        |  --conf spark.ui.enabled=false
+       |  ${extraConf.mkString("\n")}
      """.stripMargin.split("\\s+").toSeq
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2004f24ad2..c0bb5af7d5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -190,6 +190,9 @@ class HiveContext private[hive](
    */
   protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
 
+  protected[hive] def hiveThriftServerSingleSession: Boolean =
+    sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
+
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
 
-- 
GitLab