From 7c41d135709c148d4fa3a1b06b5905715c970519 Mon Sep 17 00:00:00 2001
From: wangfei <wangfei1@huawei.com>
Date: Fri, 31 Oct 2014 11:27:59 -0700
Subject: [PATCH] [SPARK-3826][SQL]enable hive-thriftserver to support
 hive-0.13.1

 In #2241 hive-thriftserver is not enabled. This patch enable hive-thriftserver to support hive-0.13.1 by using a shim layer refer to #2241.

 1 A light shim layer(code in sql/hive-thriftserver/hive-version) for each different hive version to handle api compatibility

 2 New pom profiles "hive-default" and "hive-versions"(copy from #2241) to activate different hive version

 3 SBT cmd for different version as follows:
   hive-0.12.0 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.12.0 assembly
   hive-0.13.1 --- sbt/sbt -Phive,hadoop-2.3 -Phive-0.13.1 assembly

 4 Since hive-thriftserver depend on hive subproject, this patch should be merged with #2241 to enable hive-0.13.1 for hive-thriftserver

Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>

Closes #2685 from scwf/shim-thriftserver1 and squashes the following commits:

f26f3be [wangfei] remove clean to save time
f5cac74 [wangfei] remove local hivecontext test
578234d [wangfei] use new shaded hive
18fb1ff [wangfei] exclude kryo in hive pom
fa21d09 [wangfei] clean package assembly/assembly
8a4daf2 [wangfei] minor fix
0d7f6cf [wangfei] address comments
f7c93ae [wangfei] adding build with hive 0.13 before running tests
bcf943f [wangfei] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1
c359822 [wangfei] reuse getCommandProcessor in hiveshim
52674a4 [scwf] sql/hive included since examples depend on it
3529e98 [scwf] move hive module to hive profile
f51ff4e [wangfei] update and fix conflicts
f48d3a5 [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver1
41f727b [scwf] revert pom changes
13afde0 [scwf] fix small bug
4b681f4 [scwf] enable thriftserver in profile hive-0.13.1
0bc53aa [scwf] fixed when result filed is null
dfd1c63 [scwf] update run-tests to run hive-0.12.0 default now
c6da3ce [scwf] Merge branch 'master' of https://github.com/apache/spark into shim-thriftserver
7c66b8e [scwf] update pom according spark-2706
ae47489 [scwf] update and fix conflicts
---
 assembly/pom.xml                              |   6 -
 dev/run-tests                                 |  13 +-
 pom.xml                                       |  29 +-
 python/pyspark/sql.py                         |  27 --
 sql/hive-thriftserver/pom.xml                 |  18 ++
 ...ver.scala => AbstractSparkSQLDriver.scala} |  18 +-
 .../hive/thriftserver/SparkSQLCLIDriver.scala |   6 +-
 .../thriftserver/SparkSQLCLIService.scala     |  19 +-
 .../server/SparkSQLOperationManager.scala     | 169 +----------
 .../spark/sql/hive/thriftserver/Shim12.scala  | 225 +++++++++++++++
 .../spark/sql/hive/thriftserver/Shim13.scala  | 267 ++++++++++++++++++
 sql/hive/pom.xml                              |   4 +
 12 files changed, 571 insertions(+), 230 deletions(-)
 rename sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/{SparkSQLDriver.scala => AbstractSparkSQLDriver.scala} (86%)
 create mode 100644 sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
 create mode 100644 sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala

diff --git a/assembly/pom.xml b/assembly/pom.xml
index 11d4bea936..9e8525dd46 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -201,12 +201,6 @@
           <artifactId>spark-hive_${scala.binary.version}</artifactId>
           <version>${project.version}</version>
         </dependency>
-      </dependencies>
-    </profile>
-    <profile>
-      <!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
-      <id>hive-0.12.0</id>
-      <dependencies>
         <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
diff --git a/dev/run-tests b/dev/run-tests
index 972c8c8a21..0e9eefa76a 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -142,17 +142,24 @@ CURRENT_BLOCK=$BLOCK_BUILD
   # We always build with Hive because the PySpark Spark SQL tests need it.
   BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"
 
-  echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS"
 
   # NOTE: echo "q" is needed because sbt on encountering a build file with failure
   #+ (either resolution or compilation) prompts the user for input either q, r, etc
   #+ to quit or retry. This echo is there to make it not block.
-  # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a 
+  # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
   #+ single argument!
   # QUESTION: Why doesn't 'yes "q"' work?
   # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
+  # First build with 0.12 to ensure patches do not break the hive 12 build
+  echo "[info] Compile with hive 0.12"
   echo -e "q\n" \
-    | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly \
+    | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \
+    | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
+
+  # Then build with default version(0.13.1) because tests are based on this version
+  echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive"
+  echo -e "q\n" \
+    | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly  \
     | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
 }
 
diff --git a/pom.xml b/pom.xml
index 379274d0b1..42fdbb9e09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,7 +129,7 @@
     <flume.version>1.4.0</flume.version>
     <zookeeper.version>3.4.5</zookeeper.version>
     <!-- Version used in Maven Hive dependency -->
-    <hive.version>0.13.1</hive.version>
+    <hive.version>0.13.1a</hive.version>
     <!-- Version used for internal directory structure -->
     <hive.version.short>0.13.1</hive.version.short>
     <derby.version>10.10.1.1</derby.version>
@@ -240,6 +240,18 @@
         <enabled>false</enabled>
       </snapshots>
     </repository>
+    <repository>
+      <!-- This is temporarily included to fix issues with Hive 0.13 -->
+      <id>spark-staging-hive13</id>
+      <name>Spring Staging Repository Hive 13</name>
+      <url>https://oss.sonatype.org/content/repositories/orgspark-project-1089/</url>
+      <releases>
+        <enabled>true</enabled>
+      </releases>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
   </repositories>
   <pluginRepositories>
     <pluginRepository>
@@ -908,9 +920,9 @@
                  by Spark SQL for code generation. -->
             <compilerPlugins>
               <compilerPlugin>
-                  <groupId>org.scalamacros</groupId>
-                  <artifactId>paradise_${scala.version}</artifactId>
-                  <version>${scala.macros.version}</version>
+                <groupId>org.scalamacros</groupId>
+                <artifactId>paradise_${scala.version}</artifactId>
+                <version>${scala.macros.version}</version>
               </compilerPlugin>
             </compilerPlugins>
           </configuration>
@@ -1314,14 +1326,19 @@
       </dependencies>
     </profile>
     <profile>
-      <id>hive-0.12.0</id>
+      <id>hive</id>
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
-      <!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
       <modules>
         <module>sql/hive-thriftserver</module>
       </modules>
+    </profile>
+    <profile>
+      <id>hive-0.12.0</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
       <properties>
         <hive.version>0.12.0-protobuf-2.5</hive.version>
         <hive.version.short>0.12.0</hive.version.short>
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 93fd9d4909..f0bd3cbd98 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1400,33 +1400,6 @@ class HiveContext(SQLContext):
 
 class LocalHiveContext(HiveContext):
 
-    """Starts up an instance of hive where metadata is stored locally.
-
-    An in-process metadata data is created with data stored in ./metadata.
-    Warehouse data is stored in in ./warehouse.
-
-    >>> import os
-    >>> hiveCtx = LocalHiveContext(sc)
-    >>> try:
-    ...     supress = hiveCtx.sql("DROP TABLE src")
-    ... except Exception:
-    ...     pass
-    >>> kv1 = os.path.join(os.environ["SPARK_HOME"],
-    ...        'examples/src/main/resources/kv1.txt')
-    >>> supress = hiveCtx.sql(
-    ...     "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
-    >>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
-    ...        % kv1)
-    >>> results = hiveCtx.sql("FROM src SELECT value"
-    ...      ).map(lambda r: int(r.value.split('_')[1]))
-    >>> num = results.count()
-    >>> reduce_sum = results.reduce(lambda x, y: x + y)
-    >>> num
-    500
-    >>> reduce_sum
-    130091
-    """
-
     def __init__(self, sparkContext, sqlContext=None):
         HiveContext.__init__(self, sparkContext, sqlContext)
         warnings.warn("LocalHiveContext is deprecated. "
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 124fc107cb..8db3010624 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -70,6 +70,24 @@
         <groupId>org.scalatest</groupId>
         <artifactId>scalatest-maven-plugin</artifactId>
       </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-default-sources</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>v${hive.version.short}/src/main/scala</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-deploy-plugin</artifactId>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
similarity index 86%
rename from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
rename to sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
index a5c457c677..fcb302edbf 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
 import org.apache.spark.Logging
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
 
-private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
-  extends Driver with Logging {
+private[hive] abstract class AbstractSparkSQLDriver(
+    val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
 
-  private var tableSchema: Schema = _
-  private var hiveResponse: Seq[String] = _
+  private[hive] var tableSchema: Schema = _
+  private[hive] var hiveResponse: Seq[String] = _
 
   override def init(): Unit = {
   }
@@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo
 
   override def getSchema: Schema = tableSchema
 
-  override def getResults(res: JArrayList[String]): Boolean = {
-    if (hiveResponse == null) {
-      false
-    } else {
-      res.addAll(hiveResponse)
-      hiveResponse = null
-      true
-    }
-  }
-
   override def destroy() {
     super.destroy()
     hiveResponse = null
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 7ba4564602..2cd02ae926 100755
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim
 
 private[hive] object SparkSQLCLIDriver {
   private var prompt = "spark-sql"
@@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver {
       }
     }
 
-    if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
+    if (!sessionState.isRemoteMode) {
       // Hadoop-20 and above - we need to augment classpath using hiveconf
       // components.
       // See also: code in ExecDriver.java
@@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
     } else {
       var ret = 0
       val hconf = conf.asInstanceOf[HiveConf]
-      val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
+      val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
 
       if (proc != null) {
         if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 42cbf363b2..a78311fc48 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -24,6 +24,7 @@ import java.util.{List => JList}
 import javax.security.auth.login.LoginException
 
 import org.apache.commons.logging.Log
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.hive.service.Service.STATE
@@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
     val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
     setSuperField(this, "sessionManager", sparkSqlSessionManager)
     addService(sparkSqlSessionManager)
+    var sparkServiceUGI: UserGroupInformation = null
 
-    try {
-      HiveAuthFactory.loginFromKeytab(hiveConf)
-      val serverUserName = ShimLoader.getHadoopShims
-        .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
-      setSuperField(this, "serverUserName", serverUserName)
-    } catch {
-      case e @ (_: IOException | _: LoginException) =>
-        throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+    if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
+      try {
+        HiveAuthFactory.loginFromKeytab(hiveConf)
+        sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+        HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
+      } catch {
+        case e @ (_: IOException | _: LoginException) =>
+          throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
+      }
     }
 
     initCompositeService(hiveConf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index accf61576b..2a4f24132c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -17,24 +17,15 @@
 
 package org.apache.spark.sql.hive.thriftserver.server
 
-import java.sql.Timestamp
 import java.util.{Map => JMap}
+import scala.collection.mutable.Map
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map}
-import scala.math.{random, round}
-
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
 import org.apache.hive.service.cli.session.HiveSession
 import org.apache.spark.Logging
-import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
-import org.apache.spark.sql.catalyst.plans.logical.SetCommand
-import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, ReflectionUtils}
 
 /**
  * Executes queries using Spark SQL, and maintains a list of handles to active queries.
@@ -54,158 +45,8 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
       confOverlay: JMap[String, String],
       async: Boolean): ExecuteStatementOperation = synchronized {
 
-    val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) {
-      private var result: SchemaRDD = _
-      private var iter: Iterator[SparkRow] = _
-      private var dataTypes: Array[DataType] = _
-
-      def close(): Unit = {
-        // RDDs will be cleaned automatically upon garbage collection.
-        logDebug("CLOSING")
-      }
-
-      def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
-        if (!iter.hasNext) {
-          new RowSet()
-        } else {
-          // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
-          val maxRows = maxRowsL.toInt
-          var curRow = 0
-          var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
-
-          while (curRow < maxRows && iter.hasNext) {
-            val sparkRow = iter.next()
-            val row = new Row()
-            var curCol = 0
-
-            while (curCol < sparkRow.length) {
-              if (sparkRow.isNullAt(curCol)) {
-                addNullColumnValue(sparkRow, row, curCol)
-              } else {
-                addNonNullColumnValue(sparkRow, row, curCol)
-              }
-              curCol += 1
-            }
-            rowSet += row
-            curRow += 1
-          }
-          new RowSet(rowSet, 0)
-        }
-      }
-
-      def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
-        dataTypes(ordinal) match {
-          case StringType =>
-            to.addString(from(ordinal).asInstanceOf[String])
-          case IntegerType =>
-            to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
-          case BooleanType =>
-            to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
-          case DoubleType =>
-            to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
-          case FloatType =>
-            to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
-          case DecimalType =>
-            val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
-            to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
-          case LongType =>
-            to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
-          case ByteType =>
-            to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
-          case ShortType =>
-            to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
-          case TimestampType =>
-            to.addColumnValue(
-              ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
-          case BinaryType | _: ArrayType | _: StructType | _: MapType =>
-            val hiveString = result
-              .queryExecution
-              .asInstanceOf[HiveContext#QueryExecution]
-              .toHiveString((from.get(ordinal), dataTypes(ordinal)))
-            to.addColumnValue(ColumnValue.stringValue(hiveString))
-        }
-      }
-
-      def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
-        dataTypes(ordinal) match {
-          case StringType =>
-            to.addString(null)
-          case IntegerType =>
-            to.addColumnValue(ColumnValue.intValue(null))
-          case BooleanType =>
-            to.addColumnValue(ColumnValue.booleanValue(null))
-          case DoubleType =>
-            to.addColumnValue(ColumnValue.doubleValue(null))
-          case FloatType =>
-            to.addColumnValue(ColumnValue.floatValue(null))
-          case DecimalType =>
-            to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
-          case LongType =>
-            to.addColumnValue(ColumnValue.longValue(null))
-          case ByteType =>
-            to.addColumnValue(ColumnValue.byteValue(null))
-          case ShortType =>
-            to.addColumnValue(ColumnValue.shortValue(null))
-          case TimestampType =>
-            to.addColumnValue(ColumnValue.timestampValue(null))
-          case BinaryType | _: ArrayType | _: StructType | _: MapType =>
-            to.addColumnValue(ColumnValue.stringValue(null: String))
-        }
-      }
-
-      def getResultSetSchema: TableSchema = {
-        logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
-        if (result.queryExecution.analyzed.output.size == 0) {
-          new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
-        } else {
-          val schema = result.queryExecution.analyzed.output.map { attr =>
-            new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
-          }
-          new TableSchema(schema)
-        }
-      }
-
-      def run(): Unit = {
-        logInfo(s"Running query '$statement'")
-        setState(OperationState.RUNNING)
-        try {
-          result = hiveContext.sql(statement)
-          logDebug(result.queryExecution.toString())
-          result.queryExecution.logical match {
-            case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
-              sessionToActivePool(parentSession) = value
-              logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
-            case _ =>
-          }
-
-          val groupId = round(random * 1000000).toString
-          hiveContext.sparkContext.setJobGroup(groupId, statement)
-          sessionToActivePool.get(parentSession).foreach { pool =>
-            hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
-          }
-          iter = {
-            val resultRdd = result.queryExecution.toRdd
-            val useIncrementalCollect =
-              hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
-            if (useIncrementalCollect) {
-              resultRdd.toLocalIterator
-            } else {
-              resultRdd.collect().iterator
-            }
-          }
-          dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
-          setHasResultSet(true)
-        } catch {
-          // Actually do need to catch Throwable as some failures don't inherit from Exception and
-          // HiveServer will silently swallow them.
-          case e: Throwable =>
-            logError("Error executing query:",e)
-            throw new HiveSQLException(e.toString)
-        }
-        setState(OperationState.FINISHED)
-      }
-    }
-
+    val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)(
+      hiveContext, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    operation
   }
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
new file mode 100644
index 0000000000..bbd727c686
--- /dev/null
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.sql.Timestamp
+import java.util.{ArrayList => JArrayList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.math._
+
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation
+import org.apache.hive.service.cli.session.HiveSession
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
+import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+
+/**
+ * A compatibility layer for interacting with Hive version 0.12.0.
+ */
+private[thriftserver] object HiveThriftServerShim {
+  val version = "0.12.0"
+
+  def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
+    val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI)
+    setSuperField(sparkCliService, "serverUserName", serverUserName)
+  }
+}
+
+private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
+  extends AbstractSparkSQLDriver(_context) {
+  override def getResults(res: JArrayList[String]): Boolean = {
+    if (hiveResponse == null) {
+      false
+    } else {
+      res.addAll(hiveResponse)
+      hiveResponse = null
+      true
+    }
+  }
+}
+
+private[hive] class SparkExecuteStatementOperation(
+    parentSession: HiveSession,
+    statement: String,
+    confOverlay: JMap[String, String])(
+    hiveContext: HiveContext,
+    sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
+  parentSession, statement, confOverlay) with Logging {
+  private var result: SchemaRDD = _
+  private var iter: Iterator[SparkRow] = _
+  private var dataTypes: Array[DataType] = _
+
+  def close(): Unit = {
+    // RDDs will be cleaned automatically upon garbage collection.
+    logDebug("CLOSING")
+  }
+
+  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+    if (!iter.hasNext) {
+      new RowSet()
+    } else {
+      // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
+      val maxRows = maxRowsL.toInt
+      var curRow = 0
+      var rowSet = new ArrayBuffer[Row](maxRows.min(1024))
+
+      while (curRow < maxRows && iter.hasNext) {
+        val sparkRow = iter.next()
+        val row = new Row()
+        var curCol = 0
+
+        while (curCol < sparkRow.length) {
+          if (sparkRow.isNullAt(curCol)) {
+            addNullColumnValue(sparkRow, row, curCol)
+          } else {
+            addNonNullColumnValue(sparkRow, row, curCol)
+          }
+          curCol += 1
+        }
+        rowSet += row
+        curRow += 1
+      }
+      new RowSet(rowSet, 0)
+    }
+  }
+
+  def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
+    dataTypes(ordinal) match {
+      case StringType =>
+        to.addString(from(ordinal).asInstanceOf[String])
+      case IntegerType =>
+        to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
+      case BooleanType =>
+        to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
+      case DoubleType =>
+        to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
+      case FloatType =>
+        to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
+      case DecimalType =>
+        val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
+        to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
+      case LongType =>
+        to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
+      case ByteType =>
+        to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
+      case ShortType =>
+        to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
+      case TimestampType =>
+        to.addColumnValue(
+          ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
+      case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+        val hiveString = result
+          .queryExecution
+          .asInstanceOf[HiveContext#QueryExecution]
+          .toHiveString((from.get(ordinal), dataTypes(ordinal)))
+        to.addColumnValue(ColumnValue.stringValue(hiveString))
+    }
+  }
+
+  def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
+    dataTypes(ordinal) match {
+      case StringType =>
+        to.addString(null)
+      case IntegerType =>
+        to.addColumnValue(ColumnValue.intValue(null))
+      case BooleanType =>
+        to.addColumnValue(ColumnValue.booleanValue(null))
+      case DoubleType =>
+        to.addColumnValue(ColumnValue.doubleValue(null))
+      case FloatType =>
+        to.addColumnValue(ColumnValue.floatValue(null))
+      case DecimalType =>
+        to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
+      case LongType =>
+        to.addColumnValue(ColumnValue.longValue(null))
+      case ByteType =>
+        to.addColumnValue(ColumnValue.byteValue(null))
+      case ShortType =>
+        to.addColumnValue(ColumnValue.shortValue(null))
+      case TimestampType =>
+        to.addColumnValue(ColumnValue.timestampValue(null))
+      case BinaryType | _: ArrayType | _: StructType | _: MapType =>
+        to.addColumnValue(ColumnValue.stringValue(null: String))
+    }
+  }
+
+  def getResultSetSchema: TableSchema = {
+    logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
+    if (result.queryExecution.analyzed.output.size == 0) {
+      new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
+    } else {
+      val schema = result.queryExecution.analyzed.output.map { attr =>
+        new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+      }
+      new TableSchema(schema)
+    }
+  }
+
+  def run(): Unit = {
+    logInfo(s"Running query '$statement'")
+    setState(OperationState.RUNNING)
+    try {
+      result = hiveContext.sql(statement)
+      logDebug(result.queryExecution.toString())
+      result.queryExecution.logical match {
+        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+          sessionToActivePool(parentSession) = value
+          logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
+        case _ =>
+      }
+
+      val groupId = round(random * 1000000).toString
+      hiveContext.sparkContext.setJobGroup(groupId, statement)
+      sessionToActivePool.get(parentSession).foreach { pool =>
+        hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+      }
+      iter = {
+        val resultRdd = result.queryExecution.toRdd
+        val useIncrementalCollect =
+          hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+        if (useIncrementalCollect) {
+          resultRdd.toLocalIterator
+        } else {
+          resultRdd.collect().iterator
+        }
+      }
+      dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
+      setHasResultSet(true)
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.
+      case e: Throwable =>
+        logError("Error executing query:",e)
+        throw new HiveSQLException(e.toString)
+    }
+    setState(OperationState.FINISHED)
+  }
+}
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
new file mode 100644
index 0000000000..e59681bfbe
--- /dev/null
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.security.PrivilegedExceptionAction
+import java.sql.Timestamp
+import java.util.concurrent.Future
+import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.math._
+
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hive.service.cli._
+import org.apache.hive.service.cli.operation.ExecuteStatementOperation
+import org.apache.hive.service.cli.session.HiveSession
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+
+/**
+ * A compatibility layer for interacting with Hive version 0.12.0.
+ */
+private[thriftserver] object HiveThriftServerShim {
+  val version = "0.13.1"
+
+  def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
+    setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
+  }
+}
+
+private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
+  extends AbstractSparkSQLDriver(_context) {
+  override def getResults(res: JList[_]): Boolean = {
+    if (hiveResponse == null) {
+      false
+    } else {
+      res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
+      hiveResponse = null
+      true
+    }
+  }
+}
+
+private[hive] class SparkExecuteStatementOperation(
+    parentSession: HiveSession,
+    statement: String,
+    confOverlay: JMap[String, String],
+    runInBackground: Boolean = true)(
+    hiveContext: HiveContext,
+    sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
+  parentSession, statement, confOverlay, runInBackground) with Logging {
+
+  private var result: SchemaRDD = _
+  private var iter: Iterator[SparkRow] = _
+  private var dataTypes: Array[DataType] = _
+
+  private def runInternal(cmd: String) = {
+    try {
+      result = hiveContext.sql(cmd)
+      logDebug(result.queryExecution.toString())
+      val groupId = round(random * 1000000).toString
+      hiveContext.sparkContext.setJobGroup(groupId, statement)
+      iter = {
+        val resultRdd = result.queryExecution.toRdd
+        val useIncrementalCollect =
+          hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+        if (useIncrementalCollect) {
+          resultRdd.toLocalIterator
+        } else {
+          resultRdd.collect().iterator
+        }
+      }
+      dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.
+      case e: Throwable =>
+        logError("Error executing query:",e)
+        throw new HiveSQLException(e.toString)
+    }
+  }
+
+  def close(): Unit = {
+    // RDDs will be cleaned automatically upon garbage collection.
+    logDebug("CLOSING")
+  }
+
+  def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any],  ordinal: Int) {
+    dataTypes(ordinal) match {
+      case StringType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case IntegerType =>
+        to += from.getInt(ordinal)
+      case BooleanType =>
+        to += from.getBoolean(ordinal)
+      case DoubleType =>
+        to += from.getDouble(ordinal)
+      case FloatType =>
+        to += from.getFloat(ordinal)
+      case DecimalType =>
+        to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
+      case LongType =>
+        to += from.getLong(ordinal)
+      case ByteType =>
+        to += from.getByte(ordinal)
+      case ShortType =>
+        to += from.getShort(ordinal)
+      case TimestampType =>
+        to +=  from.get(ordinal).asInstanceOf[Timestamp]
+      case BinaryType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case _: ArrayType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case _: StructType =>
+        to += from.get(ordinal).asInstanceOf[String]
+      case _: MapType =>
+        to += from.get(ordinal).asInstanceOf[String]
+    }
+  }
+
+  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+    validateDefaultFetchOrientation(order)
+    assertState(OperationState.FINISHED)
+    setHasResultSet(true)
+    val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
+    if (!iter.hasNext) {
+      reultRowSet
+    } else {
+      // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
+      val maxRows = maxRowsL.toInt
+      var curRow = 0
+      while (curRow < maxRows && iter.hasNext) {
+        val sparkRow = iter.next()
+        val row = ArrayBuffer[Any]()
+        var curCol = 0
+        while (curCol < sparkRow.length) {
+          if (sparkRow.isNullAt(curCol)) {
+            row += null
+          } else {
+            addNonNullColumnValue(sparkRow, row, curCol)
+          }
+          curCol += 1
+        }
+        reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
+        curRow += 1
+      }
+      reultRowSet
+    }
+  }
+
+  def getResultSetSchema: TableSchema = {
+    logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
+    if (result.queryExecution.analyzed.output.size == 0) {
+      new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
+    } else {
+      val schema = result.queryExecution.analyzed.output.map { attr =>
+        new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
+      }
+      new TableSchema(schema)
+    }
+  }
+
+  private def getConfigForOperation: HiveConf = {
+    var sqlOperationConf: HiveConf = getParentSession.getHiveConf
+    if (!getConfOverlay.isEmpty || shouldRunAsync) {
+      sqlOperationConf = new HiveConf(sqlOperationConf)
+      import scala.collection.JavaConversions._
+      for (confEntry <- getConfOverlay.entrySet) {
+        try {
+          sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
+        }
+        catch {
+          case e: IllegalArgumentException => {
+            throw new HiveSQLException("Error applying statement specific settings", e)
+          }
+        }
+      }
+    }
+    return sqlOperationConf
+  }
+
+  def run(): Unit = {
+    logInfo(s"Running query '$statement'")
+    val opConfig: HiveConf = getConfigForOperation
+    setState(OperationState.RUNNING)
+    setHasResultSet(true)
+
+    if (!shouldRunAsync) {
+      runInternal(statement)
+      setState(OperationState.FINISHED)
+    } else {
+      val parentSessionState = SessionState.get
+      val sessionHive: Hive = Hive.get
+      val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
+
+      val backgroundOperation: Runnable = new Runnable {
+        def run {
+          val doAsAction: PrivilegedExceptionAction[AnyRef] =
+            new PrivilegedExceptionAction[AnyRef] {
+              def run: AnyRef = {
+                Hive.set(sessionHive)
+                SessionState.setCurrentSessionState(parentSessionState)
+                try {
+                  runInternal(statement)
+                }
+                catch {
+                  case e: HiveSQLException => {
+                    setOperationException(e)
+                    logError("Error running hive query: ", e)
+                  }
+                }
+                return null
+              }
+            }
+          try {
+            ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
+          }
+          catch {
+            case e: Exception => {
+              setOperationException(new HiveSQLException(e))
+              logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
+            }
+          }
+          setState(OperationState.FINISHED)
+        }
+      }
+
+      try {
+        val backgroundHandle: Future[_] = getParentSession.getSessionManager.
+          submitBackgroundOperation(backgroundOperation)
+        setBackgroundHandle(backgroundHandle)
+      } catch {
+        // Actually do need to catch Throwable as some failures don't inherit from Exception and
+        // HiveServer will silently swallow them.
+        case e: Throwable =>
+          logError("Error executing query:",e)
+          throw new HiveSQLException(e.toString)
+      }
+    }
+  }
+}
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index db01363b4d..67e36a951e 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -65,6 +65,10 @@
           <groupId>commons-logging</groupId>
           <artifactId>commons-logging</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.esotericsoftware.kryo</groupId>
+          <artifactId>kryo</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
-- 
GitLab