From 0526fea483066086dfc27d1606f74220fe822f7f Mon Sep 17 00:00:00 2001
From: Cheolsoo Park <cheolsoop@netflix.com>
Date: Thu, 4 Jun 2015 13:27:35 -0700
Subject: [PATCH] [SPARK-6909][SQL] Remove Hive Shim code

This is a follow-up on #6393. I am removing the following files in this PR.
```
./sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
./sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
```
Basically, I re-factored the shim code as follows-
* Rewrote code directly with Hive 0.13 methods, or
* Converted code into private methods, or
* Extracted code into separate classes

But for leftover code that didn't fit in any of these cases, I created a HiveShim object. For eg, helper functions which wrap Hive 0.13 methods to work around Hive bugs are placed here.

Author: Cheolsoo Park <cheolsoop@netflix.com>

Closes #6604 from piaozhexiu/SPARK-6909 and squashes the following commits:

5dccc20 [Cheolsoo Park] Remove hive shim code
---
 .../hive/thriftserver/HiveThriftServer2.scala |  10 +-
 .../SparkExecuteStatementOperation.scala}     | 102 +---
 .../hive/thriftserver/SparkSQLCLIDriver.scala |   6 +-
 .../thriftserver/SparkSQLCLIService.scala     |   7 +-
 ...rkSQLDriver.scala => SparkSQLDriver.scala} |  20 +-
 .../sql/hive/thriftserver/SparkSQLEnv.scala   |   4 +-
 .../thriftserver/SparkSQLSessionManager.scala |  75 +++
 .../HiveThriftServer2Suites.scala             |   8 +-
 .../execution/HiveCompatibilitySuite.scala    |   3 +-
 .../apache/spark/sql/hive/HiveContext.scala   |  23 +-
 .../spark/sql/hive/HiveInspectors.scala       | 187 +++++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  22 +-
 .../org/apache/spark/sql/hive/HiveQl.scala    |   4 +-
 .../org/apache/spark/sql/hive/HiveShim.scala  | 247 ++++++++++
 .../apache/spark/sql/hive/TableReader.scala   |  11 +-
 .../hive/execution/InsertIntoHiveTable.scala  |  12 +-
 .../org/apache/spark/sql/hive/hiveUdfs.scala  |   1 +
 .../spark/sql/hive/hiveWriterContainers.scala |   3 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala  |  46 +-
 .../spark/sql/hive/StatisticsSuite.scala      |   4 -
 .../sql/hive/execution/HiveQuerySuite.scala   |  25 +-
 .../sql/hive/execution/SQLQuerySuite.scala    |  58 ++-
 .../org/apache/spark/sql/hive/Shim13.scala    | 457 ------------------
 23 files changed, 619 insertions(+), 716 deletions(-)
 rename sql/hive-thriftserver/{v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala => src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala} (66%)
 rename sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/{AbstractSparkSQLDriver.scala => SparkSQLDriver.scala} (86%)
 create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
 create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
 delete mode 100644 sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 94687eeda4..5b391d3dce 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.commons.logging.LogFactory
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -29,12 +26,15 @@ import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
 import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
-import org.apache.spark.sql.hive.{HiveContext, HiveShim}
 import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SparkContext}
 
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
 /**
  * The main entry point for the Spark SQL port of HiveServer2.  Starts up a `SparkSQLContext` and a
  * `HiveThriftServer2` thrift server.
@@ -51,7 +51,7 @@ object HiveThriftServer2 extends Logging {
   @DeveloperApi
   def startWithContext(sqlContext: HiveContext): Unit = {
     val server = new HiveThriftServer2(sqlContext)
-    sqlContext.setConf("spark.sql.hive.version", HiveShim.version)
+    sqlContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
     server.init(sqlContext.hiveconf)
     server.start()
     listener = new HiveThriftServer2Listener(server, sqlContext.conf)
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
similarity index 66%
rename from sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
rename to sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index b9d4f1c58c..c0d1266212 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -18,66 +18,31 @@
 package org.apache.spark.sql.hive.thriftserver
 
 import java.sql.{Date, Timestamp}
-import java.util.concurrent.Executors
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID}
-
-import org.apache.commons.logging.Log
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hive.service.cli.thrift.TProtocolVersion
-import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import java.util.{Map => JMap, UUID}
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
-import org.apache.hive.service.cli.session.{SessionManager, HiveSession}
+import org.apache.hive.service.cli.session.HiveSession
 
-import org.apache.spark.{SparkContext, Logging}
-import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
+import org.apache.spark.Logging
 import org.apache.spark.sql.execution.SetCommand
-import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
 
-/**
- * A compatibility layer for interacting with Hive version 0.13.1.
- */
-private[thriftserver] object HiveThriftServerShim {
-  val version = "0.13.1"
-
-  def setServerUserName(
-      sparkServiceUGI: UserGroupInformation,
-      sparkCliService:SparkSQLCLIService) = {
-    setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
-  }
-}
-
-private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext)
-  extends AbstractSparkSQLDriver(_context) {
-  override def getResults(res: JList[_]): Boolean = {
-    if (hiveResponse == null) {
-      false
-    } else {
-      res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
-      hiveResponse = null
-      true
-    }
-  }
-}
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 
 private[hive] class SparkExecuteStatementOperation(
     parentSession: HiveSession,
     statement: String,
     confOverlay: JMap[String, String],
-    runInBackground: Boolean = true)(
-    hiveContext: HiveContext,
-    sessionToActivePool: SMap[SessionHandle, String])
+    runInBackground: Boolean = true)
+    (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String])
   // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
-  extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
+  extends ExecuteStatementOperation(parentSession, statement, confOverlay, false)
+  with Logging {
 
   private var result: DataFrame = _
   private var iter: Iterator[SparkRow] = _
@@ -88,7 +53,7 @@ private[hive] class SparkExecuteStatementOperation(
     logDebug("CLOSING")
   }
 
-  def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any],  ordinal: Int) {
+  def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
     dataTypes(ordinal) match {
       case StringType =>
         to += from.getString(ordinal)
@@ -209,48 +174,3 @@ private[hive] class SparkExecuteStatementOperation(
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
 }
-
-private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
-  extends SessionManager
-  with ReflectedCompositeService {
-
-  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
-
-  override def init(hiveConf: HiveConf) {
-    setSuperField(this, "hiveConf", hiveConf)
-
-    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
-    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
-    getAncestorField[Log](this, 3, "LOG").info(
-      s"HiveServer2: Async execution pool size $backgroundPoolSize")
-
-    setSuperField(this, "operationManager", sparkSqlOperationManager)
-    addService(sparkSqlOperationManager)
-
-    initCompositeService(hiveConf)
-  }
-
-  override def openSession(
-      protocol: TProtocolVersion,
-      username: String,
-      passwd: String,
-      sessionConf: java.util.Map[String, String],
-      withImpersonation: Boolean,
-      delegationToken: String): SessionHandle = {
-    hiveContext.openSession()
-    val sessionHandle = super.openSession(
-      protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
-    val session = super.getSession(sessionHandle)
-    HiveThriftServer2.listener.onSessionCreated(
-      session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
-    sessionHandle
-  }
-
-  override def closeSession(sessionHandle: SessionHandle) {
-    HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
-    super.closeSession(sessionHandle)
-    sparkSqlOperationManager.sessionToActivePool -= sessionHandle
-
-    hiveContext.detachSession()
-  }
-}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 14f6f658d9..039cfa40d2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -32,12 +32,12 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.exec.Utilities
-import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor}
+import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor, CommandProcessorFactory}
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.hive.{HiveContext, HiveShim}
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.util.Utils
 
 private[hive] object SparkSQLCLIDriver {
@@ -267,7 +267,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
     } else {
       var ret = 0
       val hconf = conf.asInstanceOf[HiveConf]
-      val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)
+      val proc: CommandProcessor = CommandProcessorFactory.get(Array(tokens(0)), hconf)
 
       if (proc != null) {
         if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] ||
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
index 499e077d72..41f647d5f8 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala
@@ -21,8 +21,6 @@ import java.io.IOException
 import java.util.{List => JList}
 import javax.security.auth.login.LoginException
 
-import scala.collection.JavaConversions._
-
 import org.apache.commons.logging.Log
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.shims.ShimLoader
@@ -34,7 +32,8 @@ import org.apache.hive.service.{AbstractService, Service, ServiceException}
 
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
-import org.apache.spark.util.Utils
+
+import scala.collection.JavaConversions._
 
 private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
   extends CLIService
@@ -52,7 +51,7 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
       try {
         HiveAuthFactory.loginFromKeytab(hiveConf)
         sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
-        HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
+        setSuperField(this, "serviceUGI", sparkServiceUGI)
       } catch {
         case e @ (_: IOException | _: LoginException) =>
           throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
similarity index 86%
rename from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
rename to sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index 48ac9062af..77272aecf2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import scala.collection.JavaConversions._
+import java.util.{ArrayList => JArrayList, List => JList}
 
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
@@ -27,8 +27,12 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
 import org.apache.spark.Logging
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
 
-private[hive] abstract class AbstractSparkSQLDriver(
-    val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {
+import scala.collection.JavaConversions._
+
+private[hive] class SparkSQLDriver(
+    val context: HiveContext = SparkSQLEnv.hiveContext)
+  extends Driver
+  with Logging {
 
   private[hive] var tableSchema: Schema = _
   private[hive] var hiveResponse: Seq[String] = _
@@ -71,6 +75,16 @@ private[hive] abstract class AbstractSparkSQLDriver(
     0
   }
 
+  override def getResults(res: JList[_]): Boolean = {
+    if (hiveResponse == null) {
+      false
+    } else {
+      res.asInstanceOf[JArrayList[String]].addAll(hiveResponse)
+      hiveResponse = null
+      true
+    }
+  }
+
   override def getSchema: Schema = tableSchema
 
   override def destroy() {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 7c0c505e2d..79eda1f512 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -22,7 +22,7 @@ import java.io.PrintStream
 import scala.collection.JavaConversions._
 
 import org.apache.spark.scheduler.StatsReportListener
-import org.apache.spark.sql.hive.{HiveShim, HiveContext}
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{Logging, SparkConf, SparkContext}
 import org.apache.spark.util.Utils
 
@@ -56,7 +56,7 @@ private[hive] object SparkSQLEnv extends Logging {
       hiveContext.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
       hiveContext.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
 
-      hiveContext.setConf("spark.sql.hive.version", HiveShim.version)
+      hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
 
       if (log.isDebugEnabled) {
         hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
new file mode 100644
index 0000000000..357b27f740
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.thriftserver
+
+import java.util.concurrent.Executors
+
+import org.apache.commons.logging.Log
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.SessionHandle
+import org.apache.hive.service.cli.session.SessionManager
+import org.apache.hive.service.cli.thrift.TProtocolVersion
+
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+
+private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
+  extends SessionManager
+  with ReflectedCompositeService {
+
+  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
+  override def init(hiveConf: HiveConf) {
+    setSuperField(this, "hiveConf", hiveConf)
+
+    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
+    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
+    getAncestorField[Log](this, 3, "LOG").info(
+      s"HiveServer2: Async execution pool size $backgroundPoolSize")
+
+    setSuperField(this, "operationManager", sparkSqlOperationManager)
+    addService(sparkSqlOperationManager)
+
+    initCompositeService(hiveConf)
+  }
+
+  override def openSession(protocol: TProtocolVersion,
+                           username: String,
+                           passwd: String,
+                           sessionConf: java.util.Map[String, String],
+                           withImpersonation: Boolean,
+                           delegationToken: String): SessionHandle = {
+    hiveContext.openSession()
+    val sessionHandle = super.openSession(
+      protocol, username, passwd, sessionConf, withImpersonation, delegationToken)
+    val session = super.getSession(sessionHandle)
+    HiveThriftServer2.listener.onSessionCreated(
+      session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
+    sessionHandle
+  }
+
+  override def closeSession(sessionHandle: SessionHandle) {
+    HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString)
+    super.closeSession(sessionHandle)
+    sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+
+    hiveContext.detachSession()
+  }
+}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index a93a3dee43..f57c7083ea 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -40,7 +40,7 @@ import org.apache.thrift.transport.TSocket
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.util.Utils
 
 object TestData {
@@ -111,7 +111,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
     withJdbcStatement { statement =>
       val resultSet = statement.executeQuery("SET spark.sql.hive.version")
       resultSet.next()
-      assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
+      assert(resultSet.getString(1) ===
+        s"spark.sql.hive.version=${HiveContext.hiveExecutionVersion}")
     }
   }
 
@@ -365,7 +366,8 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
     withJdbcStatement { statement =>
       val resultSet = statement.executeQuery("SET spark.sql.hive.version")
       resultSet.next()
-      assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
+      assert(resultSet.getString(1) ===
+        s"spark.sql.hive.version=${HiveContext.hiveExecutionVersion}")
     }
   }
 }
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 0b1917a392..048f78b4da 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -23,7 +23,6 @@ import java.util.{Locale, TimeZone}
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.hive.HiveShim
 import org.apache.spark.sql.hive.test.TestHive
 
 /**
@@ -254,7 +253,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
 
     // the answer is sensitive for jdk version
     "udf_java_method"
-  ) ++ HiveShim.compatibilityBlackList
+  )
 
   /**
    * The set of tests that are believed to be working in catalyst. Tests not on whiteList or
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fbf2c7d8cb..800f51c5e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -17,37 +17,34 @@
 
 package org.apache.spark.sql.hive
 
-import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
+import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.sql.Timestamp
-import java.util.{ArrayList => JArrayList}
 
-import org.apache.hadoop.hive.ql.parse.VariableSubstitution
+import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.spark.sql.catalyst.ParserDialect
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.HashMap
 import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution
-import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
+import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
+import org.apache.spark.sql.sources.DataSourceStrategy
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -331,7 +328,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
         val tableParameters = relation.hiveQlTable.getParameters
         val oldTotalSize =
-          Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize))
+          Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE))
             .map(_.toLong)
             .getOrElse(0L)
         val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable)
@@ -342,7 +339,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
           catalog.client.alterTable(
             relation.table.copy(
               properties = relation.table.properties +
-                (HiveShim.getStatsSetupConstTotalSize -> newTotalSize.toString)))
+                (StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
         }
       case otherRelation =>
         throw new UnsupportedOperationException(
@@ -564,7 +561,7 @@ private[hive] object HiveContext {
     case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8")
     case (decimal: java.math.BigDecimal, DecimalType()) =>
       // Hive strips trailing zeros so use its toString
-      HiveShim.createDecimal(decimal).toString
+      HiveDecimal.create(decimal).toString
     case (other, tpe) if primitiveTypes contains tpe => other.toString
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 24cd335082..c466203cd0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _}
+import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory}
 import org.apache.hadoop.hive.serde2.{io => hiveIo}
 import org.apache.hadoop.{io => hadoopIo}
 
@@ -350,7 +351,7 @@ private[hive] trait HiveInspectors {
         new HiveVarchar(s, s.size)
 
     case _: JavaHiveDecimalObjectInspector =>
-      (o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toJavaBigDecimal)
+      (o: Any) => HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal)
 
     case _: JavaDateObjectInspector =>
       (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int])
@@ -439,31 +440,31 @@ private[hive] trait HiveInspectors {
     case _ if a == null => null
     case x: PrimitiveObjectInspector => x match {
       // TODO we don't support the HiveVarcharObjectInspector yet.
-      case _: StringObjectInspector if x.preferWritable() => HiveShim.getStringWritable(a)
+      case _: StringObjectInspector if x.preferWritable() => getStringWritable(a)
       case _: StringObjectInspector => a.asInstanceOf[UTF8String].toString()
-      case _: IntObjectInspector if x.preferWritable() => HiveShim.getIntWritable(a)
+      case _: IntObjectInspector if x.preferWritable() => getIntWritable(a)
       case _: IntObjectInspector => a.asInstanceOf[java.lang.Integer]
-      case _: BooleanObjectInspector if x.preferWritable() => HiveShim.getBooleanWritable(a)
+      case _: BooleanObjectInspector if x.preferWritable() => getBooleanWritable(a)
       case _: BooleanObjectInspector => a.asInstanceOf[java.lang.Boolean]
-      case _: FloatObjectInspector if x.preferWritable() => HiveShim.getFloatWritable(a)
+      case _: FloatObjectInspector if x.preferWritable() => getFloatWritable(a)
       case _: FloatObjectInspector => a.asInstanceOf[java.lang.Float]
-      case _: DoubleObjectInspector if x.preferWritable() => HiveShim.getDoubleWritable(a)
+      case _: DoubleObjectInspector if x.preferWritable() => getDoubleWritable(a)
       case _: DoubleObjectInspector => a.asInstanceOf[java.lang.Double]
-      case _: LongObjectInspector if x.preferWritable() => HiveShim.getLongWritable(a)
+      case _: LongObjectInspector if x.preferWritable() => getLongWritable(a)
       case _: LongObjectInspector => a.asInstanceOf[java.lang.Long]
-      case _: ShortObjectInspector if x.preferWritable() => HiveShim.getShortWritable(a)
+      case _: ShortObjectInspector if x.preferWritable() => getShortWritable(a)
       case _: ShortObjectInspector => a.asInstanceOf[java.lang.Short]
-      case _: ByteObjectInspector if x.preferWritable() => HiveShim.getByteWritable(a)
+      case _: ByteObjectInspector if x.preferWritable() => getByteWritable(a)
       case _: ByteObjectInspector => a.asInstanceOf[java.lang.Byte]
       case _: HiveDecimalObjectInspector if x.preferWritable() =>
-        HiveShim.getDecimalWritable(a.asInstanceOf[Decimal])
+        getDecimalWritable(a.asInstanceOf[Decimal])
       case _: HiveDecimalObjectInspector =>
-        HiveShim.createDecimal(a.asInstanceOf[Decimal].toJavaBigDecimal)
-      case _: BinaryObjectInspector if x.preferWritable() => HiveShim.getBinaryWritable(a)
+        HiveDecimal.create(a.asInstanceOf[Decimal].toJavaBigDecimal)
+      case _: BinaryObjectInspector if x.preferWritable() => getBinaryWritable(a)
       case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]]
-      case _: DateObjectInspector if x.preferWritable() => HiveShim.getDateWritable(a)
+      case _: DateObjectInspector if x.preferWritable() => getDateWritable(a)
       case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int])
-      case _: TimestampObjectInspector if x.preferWritable() => HiveShim.getTimestampWritable(a)
+      case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a)
       case _: TimestampObjectInspector => a.asInstanceOf[java.sql.Timestamp]
     }
     case x: SettableStructObjectInspector =>
@@ -574,31 +575,31 @@ private[hive] trait HiveInspectors {
    */
   def toInspector(expr: Expression): ObjectInspector = expr match {
     case Literal(value, StringType) =>
-      HiveShim.getStringWritableConstantObjectInspector(value)
+      getStringWritableConstantObjectInspector(value)
     case Literal(value, IntegerType) =>
-      HiveShim.getIntWritableConstantObjectInspector(value)
+      getIntWritableConstantObjectInspector(value)
     case Literal(value, DoubleType) =>
-      HiveShim.getDoubleWritableConstantObjectInspector(value)
+      getDoubleWritableConstantObjectInspector(value)
     case Literal(value, BooleanType) =>
-      HiveShim.getBooleanWritableConstantObjectInspector(value)
+      getBooleanWritableConstantObjectInspector(value)
     case Literal(value, LongType) =>
-      HiveShim.getLongWritableConstantObjectInspector(value)
+      getLongWritableConstantObjectInspector(value)
     case Literal(value, FloatType) =>
-      HiveShim.getFloatWritableConstantObjectInspector(value)
+      getFloatWritableConstantObjectInspector(value)
     case Literal(value, ShortType) =>
-      HiveShim.getShortWritableConstantObjectInspector(value)
+      getShortWritableConstantObjectInspector(value)
     case Literal(value, ByteType) =>
-      HiveShim.getByteWritableConstantObjectInspector(value)
+      getByteWritableConstantObjectInspector(value)
     case Literal(value, BinaryType) =>
-      HiveShim.getBinaryWritableConstantObjectInspector(value)
+      getBinaryWritableConstantObjectInspector(value)
     case Literal(value, DateType) =>
-      HiveShim.getDateWritableConstantObjectInspector(value)
+      getDateWritableConstantObjectInspector(value)
     case Literal(value, TimestampType) =>
-      HiveShim.getTimestampWritableConstantObjectInspector(value)
+      getTimestampWritableConstantObjectInspector(value)
     case Literal(value, DecimalType()) =>
-      HiveShim.getDecimalWritableConstantObjectInspector(value)
+      getDecimalWritableConstantObjectInspector(value)
     case Literal(_, NullType) =>
-      HiveShim.getPrimitiveNullWritableConstantObjectInspector
+      getPrimitiveNullWritableConstantObjectInspector
     case Literal(value, ArrayType(dt, _)) =>
       val listObjectInspector = toInspector(dt)
       if (value == null) {
@@ -658,8 +659,8 @@ private[hive] trait HiveInspectors {
     case _: JavaFloatObjectInspector => FloatType
     case _: WritableBinaryObjectInspector => BinaryType
     case _: JavaBinaryObjectInspector => BinaryType
-    case w: WritableHiveDecimalObjectInspector => HiveShim.decimalTypeInfoToCatalyst(w)
-    case j: JavaHiveDecimalObjectInspector => HiveShim.decimalTypeInfoToCatalyst(j)
+    case w: WritableHiveDecimalObjectInspector => decimalTypeInfoToCatalyst(w)
+    case j: JavaHiveDecimalObjectInspector => decimalTypeInfoToCatalyst(j)
     case _: WritableDateObjectInspector => DateType
     case _: JavaDateObjectInspector => DateType
     case _: WritableTimestampObjectInspector => TimestampType
@@ -668,10 +669,136 @@ private[hive] trait HiveInspectors {
     case _: JavaVoidObjectInspector => NullType
   }
 
+  private def decimalTypeInfoToCatalyst(inspector: PrimitiveObjectInspector): DecimalType = {
+    val info = inspector.getTypeInfo.asInstanceOf[DecimalTypeInfo]
+    DecimalType(info.precision(), info.scale())
+  }
+
+  private def getStringWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.stringTypeInfo, getStringWritable(value))
+
+  private def getIntWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.intTypeInfo, getIntWritable(value))
+
+  private def getDoubleWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.doubleTypeInfo, getDoubleWritable(value))
+
+  private def getBooleanWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.booleanTypeInfo, getBooleanWritable(value))
+
+  private def getLongWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.longTypeInfo, getLongWritable(value))
+
+  private def getFloatWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.floatTypeInfo, getFloatWritable(value))
+
+  private def getShortWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.shortTypeInfo, getShortWritable(value))
+
+  private def getByteWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.byteTypeInfo, getByteWritable(value))
+
+  private def getBinaryWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.binaryTypeInfo, getBinaryWritable(value))
+
+  private def getDateWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.dateTypeInfo, getDateWritable(value))
+
+  private def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.timestampTypeInfo, getTimestampWritable(value))
+
+  private def getDecimalWritableConstantObjectInspector(value: Any): ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.decimalTypeInfo, getDecimalWritable(value))
+
+  private def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector =
+    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
+      TypeInfoFactory.voidTypeInfo, null)
+
+  private def getStringWritable(value: Any): hadoopIo.Text =
+    if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
+
+  private def getIntWritable(value: Any): hadoopIo.IntWritable =
+    if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
+
+  private def getDoubleWritable(value: Any): hiveIo.DoubleWritable =
+    if (value == null) {
+      null
+    } else {
+      new hiveIo.DoubleWritable(value.asInstanceOf[Double])
+    }
+
+  private def getBooleanWritable(value: Any): hadoopIo.BooleanWritable =
+    if (value == null) {
+      null
+    } else {
+      new hadoopIo.BooleanWritable(value.asInstanceOf[Boolean])
+    }
+
+  private def getLongWritable(value: Any): hadoopIo.LongWritable =
+    if (value == null) null else new hadoopIo.LongWritable(value.asInstanceOf[Long])
+
+  private def getFloatWritable(value: Any): hadoopIo.FloatWritable =
+    if (value == null) {
+      null
+    } else {
+      new hadoopIo.FloatWritable(value.asInstanceOf[Float])
+    }
+
+  private def getShortWritable(value: Any): hiveIo.ShortWritable =
+    if (value == null) null else new hiveIo.ShortWritable(value.asInstanceOf[Short])
+
+  private def getByteWritable(value: Any): hiveIo.ByteWritable =
+    if (value == null) null else new hiveIo.ByteWritable(value.asInstanceOf[Byte])
+
+  private def getBinaryWritable(value: Any): hadoopIo.BytesWritable =
+    if (value == null) {
+      null
+    } else {
+      new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
+    }
+
+  private def getDateWritable(value: Any): hiveIo.DateWritable =
+    if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
+
+  private def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
+    if (value == null) {
+      null
+    } else {
+      new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
+    }
+
+  private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
+    if (value == null) {
+      null
+    } else {
+      // TODO precise, scale?
+      new hiveIo.HiveDecimalWritable(
+        HiveDecimal.create(value.asInstanceOf[Decimal].toJavaBigDecimal))
+    }
+
   implicit class typeInfoConversions(dt: DataType) {
     import org.apache.hadoop.hive.serde2.typeinfo._
     import TypeInfoFactory._
 
+    private def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match {
+      case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale)
+      case _ => new DecimalTypeInfo(
+        HiveShim.UNLIMITED_DECIMAL_PRECISION,
+        HiveShim.UNLIMITED_DECIMAL_SCALE)
+    }
+
     def toTypeInfo: TypeInfo = dt match {
       case ArrayType(elemType, _) =>
         getListTypeInfo(elemType.toTypeInfo)
@@ -690,7 +817,7 @@ private[hive] trait HiveInspectors {
       case LongType => longTypeInfo
       case ShortType => shortTypeInfo
       case StringType => stringTypeInfo
-      case d: DecimalType => HiveShim.decimalTypeInfo(d)
+      case d: DecimalType => decimalTypeInfo(d)
       case DateType => dateTypeInfo
       case TimestampType => timestampTypeInfo
       case NullType => voidTypeInfo
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ca1f49b546..5a4651a887 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive
 
 import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.metastore.Warehouse
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata._
-import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
@@ -37,7 +39,6 @@ import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
-import org.apache.spark.util.Utils
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -670,8 +671,8 @@ private[hive] case class MetastoreRelation
 
   @transient override lazy val statistics: Statistics = Statistics(
     sizeInBytes = {
-      val totalSize = hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize)
-      val rawDataSize = hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstRawDataSize)
+      val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
+      val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
       // TODO: check if this estimate is valid for tables after partition pruning.
       // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
       // relatively cheap if parameters for the table are populated into the metastore.  An
@@ -697,11 +698,7 @@ private[hive] case class MetastoreRelation
     }
   }
 
-  val tableDesc = HiveShim.getTableDesc(
-    Class.forName(
-      hiveQlTable.getSerializationLib,
-      true,
-      Utils.getContextOrSparkClassLoader).asInstanceOf[Class[Deserializer]],
+  val tableDesc = new TableDesc(
     hiveQlTable.getInputFormatClass,
     // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
     // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
@@ -743,6 +740,11 @@ private[hive] case class MetastoreRelation
 private[hive] object HiveMetastoreTypes {
   def toDataType(metastoreType: String): DataType = DataTypeParser.parse(metastoreType)
 
+  def decimalMetastoreString(decimalType: DecimalType): String = decimalType match {
+    case DecimalType.Fixed(precision, scale) => s"decimal($precision,$scale)"
+    case _ => s"decimal($HiveShim.UNLIMITED_DECIMAL_PRECISION,$HiveShim.UNLIMITED_DECIMAL_SCALE)"
+  }
+
   def toMetastoreType(dt: DataType): String = dt match {
     case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>"
     case StructType(fields) =>
@@ -759,7 +761,7 @@ private[hive] object HiveMetastoreTypes {
     case BinaryType => "binary"
     case BooleanType => "boolean"
     case DateType => "date"
-    case d: DecimalType => HiveShim.decimalMetastoreString(d)
+    case d: DecimalType => decimalMetastoreString(d)
     case TimestampType => "timestamp"
     case NullType => "void"
     case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index a5ca3613c5..9544d12c90 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.sql.Date
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
@@ -39,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
 import org.apache.spark.sql.sources.DescribeCommand
+import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}
 import org.apache.spark.sql.types._
@@ -46,6 +45,7 @@ import org.apache.spark.util.random.RandomSampler
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
 
 /**
  * Used when we need to start parsing the AST before deciding that we are going to pass the command
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
new file mode 100644
index 0000000000..fa5409f602
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.{InputStream, OutputStream}
+import java.rmi.server.UID
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
+import org.apache.hadoop.io.Writable
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.util.Utils
+
+/* Implicit conversions */
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+
+private[hive] object HiveShim {
+  // Precision and scale to pass for unlimited decimals; these are the same as the precision and
+  // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs)
+  val UNLIMITED_DECIMAL_PRECISION = 38
+  val UNLIMITED_DECIMAL_SCALE = 18
+
+  /*
+   * This function in hive-0.13 become private, but we have to do this to walkaround hive bug
+   */
+  private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) {
+    val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")
+    val result: StringBuilder = new StringBuilder(old)
+    var first: Boolean = old.isEmpty
+
+    for (col <- cols) {
+      if (first) {
+        first = false
+      } else {
+        result.append(',')
+      }
+      result.append(col)
+    }
+    conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString)
+  }
+
+  /*
+   * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
+   */
+  def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
+    if (ids != null && ids.size > 0) {
+      ColumnProjectionUtils.appendReadColumns(conf, ids)
+    }
+    if (names != null && names.size > 0) {
+      appendReadColumnNames(conf, names)
+    }
+  }
+
+  /*
+   * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
+   * is needed to initialize before serialization.
+   */
+  def prepareWritable(w: Writable): Writable = {
+    w match {
+      case w: AvroGenericRecordWritable =>
+        w.setRecordReaderID(new UID())
+      case _ =>
+    }
+    w
+  }
+
+  def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = {
+    if (hdoi.preferWritable()) {
+      Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue,
+        hdoi.precision(), hdoi.scale())
+    } else {
+      Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
+    }
+  }
+
+  /**
+   * This class provides the UDF creation and also the UDF instance serialization and
+   * de-serialization cross process boundary.
+   *
+   * Detail discussion can be found at https://github.com/apache/spark/pull/3640
+   *
+   * @param functionClassName UDF class name
+   */
+  private[hive] case class HiveFunctionWrapper(var functionClassName: String)
+    extends java.io.Externalizable {
+
+    // for Serialization
+    def this() = this(null)
+
+    @transient
+    def deserializeObjectByKryo[T: ClassTag](
+        kryo: Kryo,
+        in: InputStream,
+        clazz: Class[_]): T = {
+      val inp = new Input(in)
+      val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
+      inp.close()
+      t
+    }
+
+    @transient
+    def serializeObjectByKryo(
+        kryo: Kryo,
+        plan: Object,
+        out: OutputStream) {
+      val output: Output = new Output(out)
+      kryo.writeObject(output, plan)
+      output.close()
+    }
+
+    def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
+      deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
+        .asInstanceOf[UDFType]
+    }
+
+    def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
+      serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
+    }
+
+    private var instance: AnyRef = null
+
+    def writeExternal(out: java.io.ObjectOutput) {
+      // output the function name
+      out.writeUTF(functionClassName)
+
+      // Write a flag if instance is null or not
+      out.writeBoolean(instance != null)
+      if (instance != null) {
+        // Some of the UDF are serializable, but some others are not
+        // Hive Utilities can handle both cases
+        val baos = new java.io.ByteArrayOutputStream()
+        serializePlan(instance, baos)
+        val functionInBytes = baos.toByteArray
+
+        // output the function bytes
+        out.writeInt(functionInBytes.length)
+        out.write(functionInBytes, 0, functionInBytes.length)
+      }
+    }
+
+    def readExternal(in: java.io.ObjectInput) {
+      // read the function name
+      functionClassName = in.readUTF()
+
+      if (in.readBoolean()) {
+        // if the instance is not null
+        // read the function in bytes
+        val functionInBytesLength = in.readInt()
+        val functionInBytes = new Array[Byte](functionInBytesLength)
+        in.read(functionInBytes, 0, functionInBytesLength)
+
+        // deserialize the function object via Hive Utilities
+        instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes),
+          Utils.getContextOrSparkClassLoader.loadClass(functionClassName))
+      }
+    }
+
+    def createFunction[UDFType <: AnyRef](): UDFType = {
+      if (instance != null) {
+        instance.asInstanceOf[UDFType]
+      } else {
+        val func = Utils.getContextOrSparkClassLoader
+          .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
+        if (!func.isInstanceOf[UDF]) {
+          // We cache the function if it's no the Simple UDF,
+          // as we always have to create new instance for Simple UDF
+          instance = func
+        }
+        func
+      }
+    }
+  }
+
+  /*
+ * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
+ * Fix it through wrapper.
+ * */
+  implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
+    var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
+    f.setCompressCodec(w.compressCodec)
+    f.setCompressType(w.compressType)
+    f.setTableInfo(w.tableInfo)
+    f.setDestTableId(w.destTableId)
+    f
+  }
+
+  /*
+   * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
+   * Fix it through wrapper.
+   */
+  private[hive] class ShimFileSinkDesc(
+      var dir: String,
+      var tableInfo: TableDesc,
+      var compressed: Boolean)
+    extends Serializable with Logging {
+    var compressCodec: String = _
+    var compressType: String = _
+    var destTableId: Int = _
+
+    def setCompressed(compressed: Boolean) {
+      this.compressed = compressed
+    }
+
+    def getDirName(): String = dir
+
+    def setDestTableId(destTableId: Int) {
+      this.destTableId = destTableId
+    }
+
+    def setTableInfo(tableInfo: TableDesc) {
+      this.tableInfo = tableInfo
+    }
+
+    def setCompressCodec(intermediateCompressorCodec: String) {
+      compressCodec = intermediateCompressorCodec
+    }
+
+    def setCompressType(intermediateCompressType: String) {
+      compressType = intermediateCompressType
+    }
+  }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 294fc3bd7d..334bfccc9d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -25,14 +25,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
 import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
 import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
 
-import org.apache.spark.SerializableWritable
+import org.apache.spark.{Logging, SerializableWritable}
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.Logging
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.DateUtils
@@ -172,7 +171,7 @@ class HadoopTableReader(
               path.toString + tails
             }
 
-            val partPath = HiveShim.getDataLocationPath(partition)
+            val partPath = partition.getDataLocation
             val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size();
             var pathPatternStr = getPathPatternByPath(partNum, partPath)
             if (!pathPatternSet.contains(pathPatternStr)) {
@@ -187,7 +186,7 @@ class HadoopTableReader(
     val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
       .map { case (partition, partDeserializer) =>
       val partDesc = Utilities.getPartitionDesc(partition)
-      val partPath = HiveShim.getDataLocationPath(partition)
+      val partPath = partition.getDataLocation
       val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
       val ifc = partDesc.getInputFileFormatClass
         .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
@@ -325,7 +324,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging {
     val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
       rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
     } else {
-      HiveShim.getConvertedOI(
+      ObjectInspectorConverters.getConvertedOI(
         rawDeser.getObjectInspector,
         tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 8613332186..eeb472602b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,27 +19,25 @@ package org.apache.spark.sql.hive.execution
 
 import java.util
 
-import scala.collection.JavaConversions._
-
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.metastore.MetaStoreUtils
-import org.apache.hadoop.hive.ql.metadata.Hive
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
 import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.{ ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
 
+import scala.collection.JavaConversions._
+
 private[hive]
 case class InsertIntoHiveTable(
     table: MetastoreRelation,
@@ -126,7 +124,7 @@ case class InsertIntoHiveTable(
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
-    val tmpLocation = HiveShim.getExternalTmpPath(hiveContext, tableLocation)
+    val tmpLocation = hiveContext.getExternalTmpPath(tableLocation.toUri)
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     val isCompressed = sc.hiveconf.getBoolean(
       ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 1658bb93b0..01f47352b2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.types._
 
 /* Implicit conversions */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 2bb526b14b..ee440e304e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -35,8 +35,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.sql.Row
 import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
 import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.types._
 
 /**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 58e2d1fbfa..af586712e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -561,30 +561,28 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
     }
   }
 
-  if (HiveShim.version == "0.13.1") {
-    test("scan a parquet table created through a CTAS statement") {
-      withSQLConf(
-        "spark.sql.hive.convertMetastoreParquet" -> "true",
-        SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
-
-        withTempTable("jt") {
-          (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
-
-          withTable("test_parquet_ctas") {
-            sql(
-              """CREATE TABLE test_parquet_ctas STORED AS PARQUET
-                |AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5
-              """.stripMargin)
-
-            checkAnswer(
-              sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
-              Row(3) :: Row(4) :: Nil)
-
-            table("test_parquet_ctas").queryExecution.optimizedPlan match {
-              case LogicalRelation(p: ParquetRelation2) => // OK
-              case _ =>
-                fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
-            }
+  test("scan a parquet table created through a CTAS statement") {
+    withSQLConf(
+      "spark.sql.hive.convertMetastoreParquet" -> "true",
+      SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+
+      withTempTable("jt") {
+        (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
+
+        withTable("test_parquet_ctas") {
+          sql(
+            """CREATE TABLE test_parquet_ctas STORED AS PARQUET
+              |AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5
+            """.stripMargin)
+
+          checkAnswer(
+            sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
+            Row(3) :: Row(4) :: Nil)
+
+          table("test_parquet_ctas").queryExecution.optimizedPlan match {
+            case LogicalRelation(p: ParquetRelation2) => // OK
+            case _ =>
+              fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
           }
         }
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 00a69de9e4..e16e530555 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -79,10 +79,6 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
     sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
     sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
 
-    // TODO: How does it works? needs to add it back for other hive version.
-    if (HiveShim.version =="0.12.0") {
-      assert(queryTotalSize("analyzeTable") === conf.defaultSizeInBytes)
-    }
     sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
 
     assert(queryTotalSize("analyzeTable") === BigInt(11624))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 440b7c87b0..6d8d99ebc8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -874,15 +874,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
           |WITH serdeproperties('s1'='9')
         """.stripMargin)
     }
-    // Now only verify 0.12.0, and ignore other versions due to binary compatibility
-    // current TestSerDe.jar is from 0.12.0
-    if (HiveShim.version == "0.12.0") {
-      sql(s"ADD JAR $testJar")
-      sql(
-        """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
-          |WITH serdeproperties('s1'='9')
-        """.stripMargin)
-    }
     sql("DROP TABLE alter1")
   }
 
@@ -890,15 +881,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
     // this is a test case from mapjoin_addjar.q
     val testJar = TestHive.getHiveFile("hive-hcatalog-core-0.13.1.jar").getCanonicalPath
     val testData = TestHive.getHiveFile("data/files/sample.json").getCanonicalPath
-    if (HiveShim.version == "0.13.1") {
-      sql(s"ADD JAR $testJar")
-      sql(
-        """CREATE TABLE t1(a string, b string)
-        |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin)
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
-      sql("select * from src join t1 on src.key = t1.a")
-      sql("DROP TABLE t1")
-    }
+    sql(s"ADD JAR $testJar")
+    sql(
+      """CREATE TABLE t1(a string, b string)
+      |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin)
+    sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""")
+    sql("select * from src join t1 on src.key = t1.a")
+    sql("DROP TABLE t1")
   }
 
   test("ADD FILE command") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index aba3becb1b..40a35674e4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
+import org.apache.spark.sql.hive.{HiveQLDialect, MetastoreRelation}
 import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
@@ -330,35 +330,33 @@ class SQLQuerySuite extends QueryTest {
       "serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22", "MANAGED_TABLE"
     )
 
-    if (HiveShim.version =="0.13.1") {
-      val origUseParquetDataSource = conf.parquetUseDataSourceApi
-      try {
-        setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
-        sql(
-          """CREATE TABLE ctas5
-            | STORED AS parquet AS
-            |   SELECT key, value
-            |   FROM src
-            |   ORDER BY key, value""".stripMargin).collect()
-
-        checkExistence(sql("DESC EXTENDED ctas5"), true,
-          "name:key", "type:string", "name:value", "ctas5",
-          "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
-          "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
-          "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
-          "MANAGED_TABLE"
-        )
-
-        val default = getConf("spark.sql.hive.convertMetastoreParquet", "true")
-        // use the Hive SerDe for parquet tables
-        sql("set spark.sql.hive.convertMetastoreParquet = false")
-        checkAnswer(
-          sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
-          sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
-        sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
-      } finally {
-        setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource.toString)
-      }
+    val origUseParquetDataSource = conf.parquetUseDataSourceApi
+    try {
+      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+      sql(
+        """CREATE TABLE ctas5
+          | STORED AS parquet AS
+          |   SELECT key, value
+          |   FROM src
+          |   ORDER BY key, value""".stripMargin).collect()
+
+      checkExistence(sql("DESC EXTENDED ctas5"), true,
+        "name:key", "type:string", "name:value", "ctas5",
+        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
+        "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
+        "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
+        "MANAGED_TABLE"
+      )
+
+      val default = getConf("spark.sql.hive.convertMetastoreParquet", "true")
+      // use the Hive SerDe for parquet tables
+      sql("set spark.sql.hive.convertMetastoreParquet = false")
+      checkAnswer(
+        sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
+        sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
+      sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
+    } finally {
+      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource.toString)
     }
   }
 
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
deleted file mode 100644
index dbc5e029e2..0000000000
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ /dev/null
@@ -1,457 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.rmi.server.UID
-import java.util.{Properties, ArrayList => JArrayList}
-import java.io.{OutputStream, InputStream}
-
-import scala.collection.JavaConversions._
-import scala.language.implicitConversions
-import scala.reflect.ClassTag
-
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.io.Input
-import com.esotericsoftware.kryo.io.Output
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
-import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorConverters, PrimitiveObjectInspector}
-import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfo, TypeInfoFactory}
-import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
-import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.{io => hadoopIo}
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
-import org.apache.spark.util.Utils._
-
-/**
- * This class provides the UDF creation and also the UDF instance serialization and
- * de-serialization cross process boundary.
- * 
- * Detail discussion can be found at https://github.com/apache/spark/pull/3640
- *
- * @param functionClassName UDF class name
- */
-private[hive] case class HiveFunctionWrapper(var functionClassName: String)
-  extends java.io.Externalizable {
-
-  // for Serialization
-  def this() = this(null)
-
-  @transient
-  def deserializeObjectByKryo[T: ClassTag](
-      kryo: Kryo,
-      in: InputStream,
-      clazz: Class[_]): T = {
-    val inp = new Input(in)
-    val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
-    inp.close()
-    t
-  }
-
-  @transient
-  def serializeObjectByKryo(
-      kryo: Kryo,
-      plan: Object,
-      out: OutputStream ) {
-    val output: Output = new Output(out)
-    kryo.writeObject(output, plan)
-    output.close()
-  }
-
-  def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
-    deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
-      .asInstanceOf[UDFType]
-  }
-
-  def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
-    serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
-  }
-
-  private var instance: AnyRef = null
-
-  def writeExternal(out: java.io.ObjectOutput) {
-    // output the function name
-    out.writeUTF(functionClassName)
-
-    // Write a flag if instance is null or not
-    out.writeBoolean(instance != null)
-    if (instance != null) {
-      // Some of the UDF are serializable, but some others are not
-      // Hive Utilities can handle both cases
-      val baos = new java.io.ByteArrayOutputStream()
-      serializePlan(instance, baos)
-      val functionInBytes = baos.toByteArray
-
-      // output the function bytes
-      out.writeInt(functionInBytes.length)
-      out.write(functionInBytes, 0, functionInBytes.length)
-    }
-  }
-
-  def readExternal(in: java.io.ObjectInput) {
-    // read the function name
-    functionClassName = in.readUTF()
-
-    if (in.readBoolean()) {
-      // if the instance is not null
-      // read the function in bytes
-      val functionInBytesLength = in.readInt()
-      val functionInBytes = new Array[Byte](functionInBytesLength)
-      in.read(functionInBytes, 0, functionInBytesLength)
-
-      // deserialize the function object via Hive Utilities
-      instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes),
-        getContextOrSparkClassLoader.loadClass(functionClassName))
-    }
-  }
-
-  def createFunction[UDFType <: AnyRef](): UDFType = {
-    if (instance != null) {
-      instance.asInstanceOf[UDFType]
-    } else {
-      val func = getContextOrSparkClassLoader
-                   .loadClass(functionClassName).newInstance.asInstanceOf[UDFType]
-      if (!func.isInstanceOf[UDF]) {
-        // We cache the function if it's no the Simple UDF,
-        // as we always have to create new instance for Simple UDF
-        instance = func
-      }
-      func
-    }
-  }
-}
-
-/**
- * A compatibility layer for interacting with Hive version 0.13.1.
- */
-private[hive] object HiveShim {
-  val version = "0.13.1"
-
-  def getTableDesc(
-    serdeClass: Class[_ <: Deserializer],
-    inputFormatClass: Class[_ <: InputFormat[_, _]],
-    outputFormatClass: Class[_],
-    properties: Properties) = {
-    new TableDesc(inputFormatClass, outputFormatClass, properties)
-  }
-
-
-  def getStringWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.stringTypeInfo, getStringWritable(value))
-
-  def getIntWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.intTypeInfo, getIntWritable(value))
-
-  def getDoubleWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.doubleTypeInfo, getDoubleWritable(value))
-
-  def getBooleanWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.booleanTypeInfo, getBooleanWritable(value))
-
-  def getLongWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.longTypeInfo, getLongWritable(value))
-
-  def getFloatWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.floatTypeInfo, getFloatWritable(value))
-
-  def getShortWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.shortTypeInfo, getShortWritable(value))
-
-  def getByteWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.byteTypeInfo, getByteWritable(value))
-
-  def getBinaryWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.binaryTypeInfo, getBinaryWritable(value))
-
-  def getDateWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.dateTypeInfo, getDateWritable(value))
-
-  def getTimestampWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.timestampTypeInfo, getTimestampWritable(value))
-
-  def getDecimalWritableConstantObjectInspector(value: Any): ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.decimalTypeInfo, getDecimalWritable(value))
-
-  def getPrimitiveNullWritableConstantObjectInspector: ObjectInspector =
-    PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector(
-      TypeInfoFactory.voidTypeInfo, null)
-
-  def getStringWritable(value: Any): hadoopIo.Text =
-    if (value == null) null else new hadoopIo.Text(value.asInstanceOf[UTF8String].toString)
-
-  def getIntWritable(value: Any): hadoopIo.IntWritable =
-    if (value == null) null else new hadoopIo.IntWritable(value.asInstanceOf[Int])
-
-  def getDoubleWritable(value: Any): hiveIo.DoubleWritable =
-    if (value == null) {
-      null
-    } else {
-      new hiveIo.DoubleWritable(value.asInstanceOf[Double])
-    }
-
-  def getBooleanWritable(value: Any): hadoopIo.BooleanWritable =
-    if (value == null) {
-      null
-    } else {
-      new hadoopIo.BooleanWritable(value.asInstanceOf[Boolean])
-    }
-
-  def getLongWritable(value: Any): hadoopIo.LongWritable =
-    if (value == null) null else new hadoopIo.LongWritable(value.asInstanceOf[Long])
-
-  def getFloatWritable(value: Any): hadoopIo.FloatWritable =
-    if (value == null) {
-      null
-    } else {
-      new hadoopIo.FloatWritable(value.asInstanceOf[Float])
-    }
-
-  def getShortWritable(value: Any): hiveIo.ShortWritable =
-    if (value == null) null else new hiveIo.ShortWritable(value.asInstanceOf[Short])
-
-  def getByteWritable(value: Any): hiveIo.ByteWritable =
-    if (value == null) null else new hiveIo.ByteWritable(value.asInstanceOf[Byte])
-
-  def getBinaryWritable(value: Any): hadoopIo.BytesWritable =
-    if (value == null) {
-      null
-    } else {
-      new hadoopIo.BytesWritable(value.asInstanceOf[Array[Byte]])
-    }
-
-  def getDateWritable(value: Any): hiveIo.DateWritable =
-    if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
-
-  def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
-    if (value == null) {
-      null
-    } else {
-      new hiveIo.TimestampWritable(value.asInstanceOf[java.sql.Timestamp])
-    }
-
-  def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable =
-    if (value == null) {
-      null
-    } else {
-      // TODO precise, scale?
-      new hiveIo.HiveDecimalWritable(
-        HiveShim.createDecimal(value.asInstanceOf[Decimal].toJavaBigDecimal))
-    }
-
-  def getPrimitiveNullWritable: NullWritable = NullWritable.get()
-
-  def createDriverResultsArray = new JArrayList[Object]
-
-  def processResults(results: JArrayList[Object]) = {
-    results.map { r =>
-      r match {
-        case s: String => s
-        case a: Array[Object] => a(0).asInstanceOf[String]
-      }
-    }
-  }
-
-  def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE
-
-  def getStatsSetupConstRawDataSize = StatsSetupConst.RAW_DATA_SIZE
-
-  def createDefaultDBIfNeeded(context: HiveContext) = {
-    context.runSqlHive("CREATE DATABASE default")
-    context.runSqlHive("USE default")
-  }
-
-  def getCommandProcessor(cmd: Array[String], conf: HiveConf) = {
-    CommandProcessorFactory.get(cmd, conf)
-  }
-
-  def createDecimal(bd: java.math.BigDecimal): HiveDecimal = {
-    HiveDecimal.create(bd)
-  }
-
-  /*
-   * This function in hive-0.13 become private, but we have to do this to walkaround hive bug
-   */
-  private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) {
-    val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "")
-    val result: StringBuilder = new StringBuilder(old)
-    var first: Boolean = old.isEmpty
-
-    for (col <- cols) {
-      if (first) {
-        first = false
-      } else {
-        result.append(',')
-      }
-      result.append(col)
-    }
-    conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString)
-  }
-
-  /*
-   * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
-   */
-  def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
-    if (ids != null && ids.size > 0) {
-      ColumnProjectionUtils.appendReadColumns(conf, ids)
-    }
-    if (names != null && names.size > 0) {
-      appendReadColumnNames(conf, names)
-    }
-  }
-
-  def getExternalTmpPath(context: Context, path: Path) = {
-    context.getExternalTmpPath(path.toUri)
-  }
-
-  def getDataLocationPath(p: Partition) = p.getDataLocation
-
-  def getAllPartitionsOf(client: Hive, tbl: Table) =  client.getAllPartitionsOf(tbl)
-
-  def compatibilityBlackList = Seq()
-
-  def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
-    tbl.setDataLocation(new Path(crtTbl.getLocation()))
-  }
-
-  /*
-   * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
-   * Fix it through wrapper.
-   * */
-  implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
-    var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
-    f.setCompressCodec(w.compressCodec)
-    f.setCompressType(w.compressType)
-    f.setTableInfo(w.tableInfo)
-    f.setDestTableId(w.destTableId)
-    f
-  }
-
-  // Precision and scale to pass for unlimited decimals; these are the same as the precision and
-  // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs)
-  private val UNLIMITED_DECIMAL_PRECISION = 38
-  private val UNLIMITED_DECIMAL_SCALE = 18
-
-  def decimalMetastoreString(decimalType: DecimalType): String = decimalType match {
-    case DecimalType.Fixed(precision, scale) => s"decimal($precision,$scale)"
-    case _ => s"decimal($UNLIMITED_DECIMAL_PRECISION,$UNLIMITED_DECIMAL_SCALE)"
-  }
-
-  def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match {
-    case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale)
-    case _ => new DecimalTypeInfo(UNLIMITED_DECIMAL_PRECISION, UNLIMITED_DECIMAL_SCALE)
-  }
-
-  def decimalTypeInfoToCatalyst(inspector: PrimitiveObjectInspector): DecimalType = {
-    val info = inspector.getTypeInfo.asInstanceOf[DecimalTypeInfo]
-    DecimalType(info.precision(), info.scale())
-  }
-
-  def toCatalystDecimal(hdoi: HiveDecimalObjectInspector, data: Any): Decimal = {
-    if (hdoi.preferWritable()) {
-      Decimal(hdoi.getPrimitiveWritableObject(data).getHiveDecimal().bigDecimalValue,
-        hdoi.precision(), hdoi.scale())
-    } else {
-      Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
-    }
-  }
-
-  def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
-    ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
-  }
-
-  /*
-   * Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
-   * is needed to initialize before serialization.
-   */
-  def prepareWritable(w: Writable): Writable = {
-    w match {
-      case w: AvroGenericRecordWritable =>
-        w.setRecordReaderID(new UID())
-      case _ =>
-    }
-    w
-  }
-
-  def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {
-    if (crtTbl != null && crtTbl.getNullFormat() != null) {
-      tbl.setSerdeParam(serdeConstants.SERIALIZATION_NULL_FORMAT, crtTbl.getNullFormat())
-    }
-  }
-}
-
-/*
- * Bug introduced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
- * Fix it through wrapper.
- */
-private[hive] class ShimFileSinkDesc(
-    var dir: String,
-    var tableInfo: TableDesc,
-    var compressed: Boolean)
-  extends Serializable with Logging {
-  var compressCodec: String = _
-  var compressType: String = _
-  var destTableId: Int = _
-
-  def setCompressed(compressed: Boolean) {
-    this.compressed = compressed
-  }
-
-  def getDirName = dir
-
-  def setDestTableId(destTableId: Int) {
-    this.destTableId = destTableId
-  }
-
-  def setTableInfo(tableInfo: TableDesc) {
-    this.tableInfo = tableInfo
-  }
-
-  def setCompressCodec(intermediateCompressorCodec: String) {
-    compressCodec = intermediateCompressorCodec
-  }
-
-  def setCompressType(intermediateCompressType: String) {
-    compressType = intermediateCompressType
-  }
-}
-- 
GitLab