diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index a4e9f03b4334234de02e82c9404cc2de981226b7..af2850d4f568cbc96f55c18980f246adc1c934bf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -26,14 +26,10 @@ import scala.language.reflectiveCalls
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.{PartitionDropOptions, TableType => HiveTableType}
+import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema}
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType}
-import org.apache.hadoop.hive.metastore.api.{NoSuchObjectException, PrincipalType}
-import org.apache.hadoop.hive.metastore.api.{ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable}
-import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.security.UserGroupInformation
@@ -41,13 +37,13 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.util.{CausedBy, CircularBuffer, Utils}
+import org.apache.spark.util.{CircularBuffer, Utils}
 
 /**
  * A class that wraps the HiveClient and converts its responses to externally visible classes.
@@ -400,11 +396,7 @@ private[hive] class HiveClientImpl(
       table: String,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = withHiveState {
-    val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
-    parts.foreach { s =>
-      addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
-    }
-    client.createPartitions(addPartitionDesc)
+    shim.createPartitions(client, db, table, parts, ignoreIfExists)
   }
 
   override def dropPartitions(
@@ -430,10 +422,9 @@ private[hive] class HiveClientImpl(
       }.distinct
     var droppedParts = ArrayBuffer.empty[java.util.List[String]]
     matchingParts.foreach { partition =>
-      val dropOptions = new PartitionDropOptions
-      dropOptions.ifExists = ignoreIfNotExists
       try {
-        client.dropPartition(db, table, partition, dropOptions)
+        val deleteData = true
+        client.dropPartition(db, table, partition, deleteData)
       } catch {
         case e: Exception =>
           val remainingParts = matchingParts.toBuffer -- droppedParts
@@ -629,37 +620,28 @@ private[hive] class HiveClientImpl(
   }
 
   override def createFunction(db: String, func: CatalogFunction): Unit = withHiveState {
-    client.createFunction(toHiveFunction(func, db))
+    shim.createFunction(client, db, func)
   }
 
   override def dropFunction(db: String, name: String): Unit = withHiveState {
-    client.dropFunction(db, name)
+    shim.dropFunction(client, db, name)
   }
 
   override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState {
-    val catalogFunc = getFunction(db, oldName)
-      .copy(identifier = FunctionIdentifier(newName, Some(db)))
-    val hiveFunc = toHiveFunction(catalogFunc, db)
-    client.alterFunction(db, oldName, hiveFunc)
+    shim.renameFunction(client, db, oldName, newName)
   }
 
   override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState {
-    client.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
+    shim.alterFunction(client, db, func)
   }
 
   override def getFunctionOption(
-      db: String,
-      name: String): Option[CatalogFunction] = withHiveState {
-    try {
-      Option(client.getFunction(db, name)).map(fromHiveFunction)
-    } catch {
-      case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) =>
-        None
-    }
+      db: String, name: String): Option[CatalogFunction] = withHiveState {
+    shim.getFunctionOption(client, db, name)
   }
 
   override def listFunctions(db: String, pattern: String): Seq[String] = withHiveState {
-    client.getFunctions(db, pattern).asScala
+    shim.listFunctions(client, db, pattern)
   }
 
   def addJar(path: String): Unit = {
@@ -708,36 +690,6 @@ private[hive] class HiveClientImpl(
     Utils.classForName(name)
       .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
 
-  private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
-    val resourceUris = f.resources.map { resource =>
-      new ResourceUri(
-        ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
-    }
-    new HiveFunction(
-      f.identifier.funcName,
-      db,
-      f.className,
-      null,
-      PrincipalType.USER,
-      (System.currentTimeMillis / 1000).toInt,
-      FunctionType.JAVA,
-      resourceUris.asJava)
-  }
-
-  private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
-    val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
-    val resources = hf.getResourceUris.asScala.map { uri =>
-      val resourceType = uri.getResourceType() match {
-        case ResourceType.ARCHIVE => "archive"
-        case ResourceType.FILE => "file"
-        case ResourceType.JAR => "jar"
-        case r => throw new AnalysisException(s"Unknown resource type: $r")
-      }
-      FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
-    }
-    new CatalogFunction(name, hf.getClassName, resources)
-  }
-
   private def toHiveColumn(c: CatalogColumn): FieldSchema = {
     new FieldSchema(c.name, c.dataType, c.comment.orNull)
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 4ecf866f9639557b40d99a774421eb63f00a6022..78713c3f0bace241983ae2fdf816f4ace8a0ef5f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -27,15 +27,23 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
+import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
 import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde.serdeConstants
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{IntegralType, StringType}
+import org.apache.spark.util.CausedBy
+
 
 /**
  * A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
@@ -73,6 +81,13 @@ private[client] sealed abstract class Shim {
 
   def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
 
+  def createPartitions(
+      hive: Hive,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit
+
   def loadPartition(
       hive: Hive,
       loadPath: Path,
@@ -100,6 +115,18 @@ private[client] sealed abstract class Shim {
       holdDDLTime: Boolean,
       listBucketingEnabled: Boolean): Unit
 
+  def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit
+
+  def dropFunction(hive: Hive, db: String, name: String): Unit
+
+  def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit
+
+  def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit
+
+  def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction]
+
+  def listFunctions(hive: Hive, db: String, pattern: String): Seq[String]
+
   def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
 
   protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
@@ -112,7 +139,6 @@ private[client] sealed abstract class Shim {
   protected def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
     klass.getMethod(name, args: _*)
   }
-
 }
 
 private[client] class Shim_v0_12 extends Shim with Logging {
@@ -144,6 +170,22 @@ private[client] class Shim_v0_12 extends Shim with Logging {
       classOf[Driver],
       "getResults",
       classOf[JArrayList[String]])
+  private lazy val createPartitionMethod =
+    findMethod(
+      classOf[Hive],
+      "createPartition",
+      classOf[Table],
+      classOf[JMap[String, String]],
+      classOf[Path],
+      classOf[JMap[String, String]],
+      classOf[String],
+      classOf[String],
+      JInteger.TYPE,
+      classOf[JList[Object]],
+      classOf[String],
+      classOf[JMap[String, String]],
+      classOf[JList[Object]],
+      classOf[JList[Object]])
   private lazy val loadPartitionMethod =
     findMethod(
       classOf[Hive],
@@ -199,6 +241,42 @@ private[client] class Shim_v0_12 extends Shim with Logging {
   override def setDataLocation(table: Table, loc: String): Unit =
     setDataLocationMethod.invoke(table, new URI(loc))
 
+  // Follows exactly the same logic of DDLTask.createPartitions in Hive 0.12
+  override def createPartitions(
+      hive: Hive,
+      database: String,
+      tableName: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    val table = hive.getTable(database, tableName)
+    parts.foreach { s =>
+      val location = s.storage.locationUri.map(new Path(table.getPath, _)).orNull
+      val spec = s.spec.asJava
+      if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
+        // Ignore this partition since it already exists and ignoreIfExists == true
+      } else {
+        if (location == null && table.isView()) {
+          throw new HiveException("LOCATION clause illegal for view partition");
+        }
+
+        createPartitionMethod.invoke(
+          hive,
+          table,
+          spec,
+          location,
+          null, // partParams
+          null, // inputFormat
+          null, // outputFormat
+          -1: JInteger, // numBuckets
+          null, // cols
+          null, // serializationLib
+          null, // serdeParams
+          null, // bucketCols
+          null) // sortCols
+      }
+    }
+  }
+
   override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
     getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
 
@@ -265,6 +343,30 @@ private[client] class Shim_v0_12 extends Shim with Logging {
     dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
   }
 
+  override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+    throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
+      "Please use Hive 0.13 or higher.")
+  }
+
+  def dropFunction(hive: Hive, db: String, name: String): Unit = {
+    throw new NoSuchPermanentFunctionException(db, name)
+  }
+
+  def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
+    throw new NoSuchPermanentFunctionException(db, oldName)
+  }
+
+  def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+    throw new NoSuchPermanentFunctionException(db, func.identifier.funcName)
+  }
+
+  def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
+    None
+  }
+
+  def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
+    Seq.empty[String]
+  }
 }
 
 private[client] class Shim_v0_13 extends Shim_v0_12 {
@@ -308,9 +410,85 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   override def setDataLocation(table: Table, loc: String): Unit =
     setDataLocationMethod.invoke(table, new Path(loc))
 
+  override def createPartitions(
+      hive: Hive,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
+    parts.foreach { s =>
+      addPartitionDesc.addPartition(s.spec.asJava, s.storage.locationUri.orNull)
+    }
+    hive.createPartitions(addPartitionDesc)
+  }
+
   override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
     getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].asScala.toSeq
 
+  private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = {
+    val resourceUris = f.resources.map { resource =>
+      new ResourceUri(
+        ResourceType.valueOf(resource.resourceType.resourceType.toUpperCase()), resource.uri)
+    }
+    new HiveFunction(
+      f.identifier.funcName,
+      db,
+      f.className,
+      null,
+      PrincipalType.USER,
+      (System.currentTimeMillis / 1000).toInt,
+      FunctionType.JAVA,
+      resourceUris.asJava)
+  }
+
+  override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+    hive.createFunction(toHiveFunction(func, db))
+  }
+
+  override def dropFunction(hive: Hive, db: String, name: String): Unit = {
+    hive.dropFunction(db, name)
+  }
+
+  override def renameFunction(hive: Hive, db: String, oldName: String, newName: String): Unit = {
+    val catalogFunc = getFunctionOption(hive, db, oldName)
+      .getOrElse(throw new NoSuchPermanentFunctionException(db, oldName))
+      .copy(identifier = FunctionIdentifier(newName, Some(db)))
+    val hiveFunc = toHiveFunction(catalogFunc, db)
+    hive.alterFunction(db, oldName, hiveFunc)
+  }
+
+  override def alterFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
+    hive.alterFunction(db, func.identifier.funcName, toHiveFunction(func, db))
+  }
+
+  private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
+    val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
+    val resources = hf.getResourceUris.asScala.map { uri =>
+      val resourceType = uri.getResourceType() match {
+        case ResourceType.ARCHIVE => "archive"
+        case ResourceType.FILE => "file"
+        case ResourceType.JAR => "jar"
+        case r => throw new AnalysisException(s"Unknown resource type: $r")
+      }
+      FunctionResource(FunctionResourceType.fromString(resourceType), uri.getUri())
+    }
+    new CatalogFunction(name, hf.getClassName, resources)
+  }
+
+  override def getFunctionOption(hive: Hive, db: String, name: String): Option[CatalogFunction] = {
+    try {
+      Option(hive.getFunction(db, name)).map(fromHiveFunction)
+    } catch {
+      case CausedBy(ex: NoSuchObjectException) if ex.getMessage.contains(name) =>
+        None
+    }
+  }
+
+  override def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
+    hive.getFunctions(db, pattern).asScala
+  }
+
   /**
    * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
    * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a6a5ab3988fc9546a61e0914e516f8127727afc1..57e8db7e88fb5393a0fa49ced9b42ecabcdb88bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,21 +17,27 @@
 
 package org.apache.spark.sql.hive.client
 
-import java.io.File
+import java.io.{ByteArrayOutputStream, File, PrintStream}
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
+import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.tags.ExtendedHiveTest
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 /**
  * A simple set of tests that call the methods of a [[HiveClient]], loading different version
@@ -97,12 +103,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
 
   private val emptyDir = Utils.createTempDir().getCanonicalPath
 
-  private def partSpec = {
-    val hashMap = new java.util.LinkedHashMap[String, String]
-    hashMap.put("key", "1")
-    hashMap
-  }
-
   // Its actually pretty easy to mess things up and have all of your tests "pass" by accidentally
   // connecting to an auto-populated, in-process metastore.  Let's make sure we are getting the
   // versions right by forcing a known compatibility failure.
@@ -122,7 +122,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
   }
 
-  private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
+  private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
 
   private var client: HiveClient = null
 
@@ -130,110 +130,402 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: create client") {
       client = null
       System.gc() // Hack to avoid SEGV on some JVM versions.
+      val hadoopConf = new Configuration();
+      hadoopConf.set("test", "success")
       client =
         IsolatedClientLoader.forVersion(
           hiveMetastoreVersion = version,
           hadoopVersion = VersionInfo.getVersion,
           sparkConf = sparkConf,
-          hadoopConf = new Configuration(),
+          hadoopConf,
           config = buildConf(),
           ivyPath = ivyPath).createClient()
     }
 
+    def table(database: String, tableName: String): CatalogTable = {
+      CatalogTable(
+        identifier = TableIdentifier(tableName, Some(database)),
+        tableType = CatalogTableType.MANAGED,
+        schema = Seq(CatalogColumn("key", "int")),
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = Some(classOf[TextInputFormat].getName),
+          outputFormat = Some(classOf[HiveIgnoreKeyTextOutputFormat[_, _]].getName),
+          serde = Some(classOf[LazySimpleSerDe].getName()),
+          compressed = false,
+          serdeProperties = Map.empty
+        ))
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Database related API
+    ///////////////////////////////////////////////////////////////////////////
+
+    val tempDatabasePath = Utils.createTempDir().getCanonicalPath
+
     test(s"$version: createDatabase") {
-      val db = CatalogDatabase("default", "desc", "loc", Map())
-      client.createDatabase(db, ignoreIfExists = true)
+      val defaultDB = CatalogDatabase("default", "desc", "loc", Map())
+      client.createDatabase(defaultDB, ignoreIfExists = true)
+      val tempDB = CatalogDatabase(
+        "temporary", description = "test create", tempDatabasePath, Map())
+      client.createDatabase(tempDB, ignoreIfExists = true)
+    }
+
+    test(s"$version: setCurrentDatabase") {
+      client.setCurrentDatabase("default")
+    }
+
+    test(s"$version: getDatabase") {
+      // No exception should be thrown
+      client.getDatabase("default")
+    }
+
+    test(s"$version: getDatabaseOption") {
+      assert(client.getDatabaseOption("default").isDefined)
+      assert(client.getDatabaseOption("nonexist") == None)
     }
 
+    test(s"$version: listDatabases") {
+      assert(client.listDatabases("defau.*") == Seq("default"))
+    }
+
+    test(s"$version: alterDatabase") {
+      val database = client.getDatabase("temporary").copy(properties = Map("flag" -> "true"))
+      client.alterDatabase(database)
+      assert(client.getDatabase("temporary").properties.contains("flag"))
+    }
+
+    test(s"$version: dropDatabase") {
+      assert(client.getDatabaseOption("temporary").isDefined)
+      client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true)
+      assert(client.getDatabaseOption("temporary").isEmpty)
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Table related API
+    ///////////////////////////////////////////////////////////////////////////
+
     test(s"$version: createTable") {
-      val table =
-        CatalogTable(
-          identifier = TableIdentifier("src", Some("default")),
-          tableType = CatalogTableType.MANAGED,
-          schema = Seq(CatalogColumn("key", "int")),
-          storage = CatalogStorageFormat(
-            locationUri = None,
-            inputFormat = Some(classOf[org.apache.hadoop.mapred.TextInputFormat].getName),
-            outputFormat = Some(
-              classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
-            serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()),
-            compressed = false,
-            serdeProperties = Map.empty
-          ))
-
-      client.createTable(table, ignoreIfExists = false)
+      client.createTable(table("default", tableName = "src"), ignoreIfExists = false)
+      client.createTable(table("default", "temporary"), ignoreIfExists = false)
+    }
+
+    test(s"$version: loadTable") {
+      client.loadTable(
+        emptyDir,
+        tableName = "src",
+        replace = false,
+        holdDDLTime = false)
     }
 
     test(s"$version: getTable") {
+      // No exception should be thrown
       client.getTable("default", "src")
     }
 
-    test(s"$version: listTables") {
-      assert(client.listTables("default") === Seq("src"))
+    test(s"$version: getTableOption") {
+      assert(client.getTableOption("default", "src").isDefined)
     }
 
-    test(s"$version: getDatabase") {
-      client.getDatabase("default")
+    test(s"$version: alterTable(table: CatalogTable)") {
+      val newTable = client.getTable("default", "src").copy(properties = Map("changed" -> ""))
+      client.alterTable(newTable)
+      assert(client.getTable("default", "src").properties.contains("changed"))
     }
 
-    test(s"$version: alterTable") {
-      client.alterTable(client.getTable("default", "src"))
+    test(s"$version: alterTable(tableName: String, table: CatalogTable)") {
+      val newTable = client.getTable("default", "src").copy(properties = Map("changedAgain" -> ""))
+      client.alterTable("src", newTable)
+      assert(client.getTable("default", "src").properties.contains("changedAgain"))
     }
 
-    test(s"$version: set command") {
-      client.runSqlHive("SET spark.sql.test.key=1")
+    test(s"$version: listTables(database)") {
+      assert(client.listTables("default") === Seq("src", "temporary"))
+    }
+
+    test(s"$version: listTables(database, pattern)") {
+      assert(client.listTables("default", pattern = "src") === Seq("src"))
+      assert(client.listTables("default", pattern = "nonexist").isEmpty)
+    }
+
+    test(s"$version: dropTable") {
+      client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false)
+      assert(client.listTables("default") === Seq("src"))
     }
 
-    test(s"$version: create partitioned table DDL") {
-      client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key INT)")
-      client.runSqlHive("ALTER TABLE src_part ADD PARTITION (key = '1')")
+    ///////////////////////////////////////////////////////////////////////////
+    // Partition related API
+    ///////////////////////////////////////////////////////////////////////////
+
+    val storageFormat = CatalogStorageFormat(
+      locationUri = None,
+      inputFormat = None,
+      outputFormat = None,
+      serde = None,
+      compressed = false,
+      serdeProperties = Map.empty)
+
+    test(s"$version: sql create partitioned table") {
+      client.runSqlHive("CREATE TABLE src_part (value INT) PARTITIONED BY (key1 INT, key2 INT)")
     }
 
-    test(s"$version: getPartitions") {
-      client.getPartitions(client.getTable("default", "src_part"))
+    test(s"$version: createPartitions") {
+      val partition1 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "1"), storageFormat)
+      val partition2 = CatalogTablePartition(Map("key1" -> "1", "key2" -> "2"), storageFormat)
+      client.createPartitions(
+        "default", "src_part", Seq(partition1, partition2), ignoreIfExists = true)
+    }
+
+    test(s"$version: getPartitions(catalogTable)") {
+      assert(2 == client.getPartitions(client.getTable("default", "src_part")).size)
     }
 
     test(s"$version: getPartitionsByFilter") {
-      client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(
-        AttributeReference("key", IntegerType, false)(NamedExpression.newExprId),
-        Literal(1))))
+      // Only one partition [1, 1] for key2 == 1
+      val result = client.getPartitionsByFilter(client.getTable("default", "src_part"),
+        Seq(EqualTo(AttributeReference("key2", IntegerType)(), Literal(1))))
+
+      // Hive 0.12 doesn't support getPartitionsByFilter, it ignores the filter condition.
+      if (version != "0.12") {
+        assert(result.size == 1)
+      }
+    }
+
+    test(s"$version: getPartition") {
+      // No exception should be thrown
+      client.getPartition("default", "src_part", Map("key1" -> "1", "key2" -> "2"))
+    }
+
+    test(s"$version: getPartitionOption(db: String, table: String, spec: TablePartitionSpec)") {
+      val partition = client.getPartitionOption(
+        "default", "src_part", Map("key1" -> "1", "key2" -> "2"))
+      assert(partition.isDefined)
+    }
+
+    test(s"$version: getPartitionOption(table: CatalogTable, spec: TablePartitionSpec)") {
+      val partition = client.getPartitionOption(
+        client.getTable("default", "src_part"), Map("key1" -> "1", "key2" -> "2"))
+      assert(partition.isDefined)
+    }
+
+    test(s"$version: getPartitions(db: String, table: String)") {
+      assert(2 == client.getPartitions("default", "src_part", None).size)
     }
 
     test(s"$version: loadPartition") {
+      val partSpec = new java.util.LinkedHashMap[String, String]
+      partSpec.put("key1", "1")
+      partSpec.put("key2", "2")
+
       client.loadPartition(
         emptyDir,
         "default.src_part",
         partSpec,
-        false,
-        false,
-        false,
-        false)
-    }
-
-    test(s"$version: loadTable") {
-      client.loadTable(
-        emptyDir,
-        "src",
-        false,
-        false)
+        replace = false,
+        holdDDLTime = false,
+        inheritTableSpecs = false,
+        isSkewedStoreAsSubdir = false)
     }
 
     test(s"$version: loadDynamicPartitions") {
+      val partSpec = new java.util.LinkedHashMap[String, String]
+      partSpec.put("key1", "1")
+      partSpec.put("key2", "") // Dynamic partition
+
       client.loadDynamicPartitions(
         emptyDir,
         "default.src_part",
         partSpec,
-        false,
-        1,
+        replace = false,
+        numDP = 1,
         false,
         false)
     }
 
-    test(s"$version: create index and reset") {
+    test(s"$version: renamePartitions") {
+      val oldSpec = Map("key1" -> "1", "key2" -> "1")
+      val newSpec = Map("key1" -> "1", "key2" -> "3")
+      client.renamePartitions("default", "src_part", Seq(oldSpec), Seq(newSpec))
+
+      // Checks the existence of the new partition (key1 = 1, key2 = 3)
+      assert(client.getPartitionOption("default", "src_part", newSpec).isDefined)
+    }
+
+    test(s"$version: alterPartitions") {
+      val spec = Map("key1" -> "1", "key2" -> "2")
+      val newLocation = Utils.createTempDir().getPath()
+      val storage = storageFormat.copy(locationUri = Some(newLocation))
+      val partition = CatalogTablePartition(spec, storage)
+      client.alterPartitions("default", "src_part", Seq(partition))
+      assert(client.getPartition("default", "src_part", spec)
+        .storage.locationUri == Some(newLocation))
+    }
+
+    test(s"$version: dropPartitions") {
+      val spec = Map("key1" -> "1", "key2" -> "3")
+      client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true)
+      assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Function related API
+    ///////////////////////////////////////////////////////////////////////////
+
+    def function(name: String, className: String): CatalogFunction = {
+      CatalogFunction(
+        FunctionIdentifier(name, Some("default")), className, Seq.empty[FunctionResource])
+    }
+
+    test(s"$version: createFunction") {
+      val functionClass = "org.apache.spark.MyFunc1"
+      if (version == "0.12") {
+        // Hive 0.12 doesn't support creating permanent functions
+        intercept[AnalysisException] {
+          client.createFunction("default", function("func1", functionClass))
+        }
+      } else {
+        client.createFunction("default", function("func1", functionClass))
+      }
+    }
+
+    test(s"$version: functionExists") {
+      if (version == "0.12") {
+        // Hive 0.12 doesn't allow customized permanent functions
+        assert(client.functionExists("default", "func1") == false)
+      } else {
+        assert(client.functionExists("default", "func1") == true)
+      }
+    }
+
+    test(s"$version: renameFunction") {
+      if (version == "0.12") {
+        // Hive 0.12 doesn't allow customized permanent functions
+        intercept[NoSuchPermanentFunctionException] {
+          client.renameFunction("default", "func1", "func2")
+        }
+      } else {
+        client.renameFunction("default", "func1", "func2")
+        assert(client.functionExists("default", "func2") == true)
+      }
+    }
+
+    test(s"$version: alterFunction") {
+      val functionClass = "org.apache.spark.MyFunc2"
+      if (version == "0.12") {
+        // Hive 0.12 doesn't allow customized permanent functions
+        intercept[NoSuchPermanentFunctionException] {
+          client.alterFunction("default", function("func2", functionClass))
+        }
+      } else {
+        client.alterFunction("default", function("func2", functionClass))
+      }
+    }
+
+    test(s"$version: getFunction") {
+      if (version == "0.12") {
+        // Hive 0.12 doesn't allow customized permanent functions
+        intercept[NoSuchPermanentFunctionException] {
+          client.getFunction("default", "func2")
+        }
+      } else {
+        // No exception should be thrown
+        val func = client.getFunction("default", "func2")
+        assert(func.className == "org.apache.spark.MyFunc2")
+      }
+    }
+
+    test(s"$version: getFunctionOption") {
+      if (version == "0.12") {
+        // Hive 0.12 doesn't allow customized permanent functions
+        assert(client.getFunctionOption("default", "func2").isEmpty)
+      } else {
+        assert(client.getFunctionOption("default", "func2").isDefined)
+      }
+    }
+
+    test(s"$version: listFunctions") {
+      if (version == "0.12") {
+        // Hive 0.12 doesn't allow customized permanent functions
+        assert(client.listFunctions("default", "fun.*").isEmpty)
+      } else {
+        assert(client.listFunctions("default", "fun.*").size == 1)
+      }
+    }
+
+    test(s"$version: dropFunction") {
+      if (version == "0.12") {
+        // Hive 0.12 doesn't support creating permanent functions
+        intercept[NoSuchPermanentFunctionException] {
+          client.dropFunction("default", "func2")
+        }
+      } else {
+        // No exception should be thrown
+        client.dropFunction("default", "func2")
+        assert(client.listFunctions("default", "fun.*").size == 0)
+      }
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // SQL related API
+    ///////////////////////////////////////////////////////////////////////////
+
+    test(s"$version: sql set command") {
+      client.runSqlHive("SET spark.sql.test.key=1")
+    }
+
+    test(s"$version: sql create index and reset") {
       client.runSqlHive("CREATE TABLE indexed_table (key INT)")
       client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
         "as 'COMPACT' WITH DEFERRED REBUILD")
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Miscellaneous API
+    ///////////////////////////////////////////////////////////////////////////
+
+    test(s"$version: version") {
+      assert(client.version.fullVersion.startsWith(version))
+    }
+
+    test(s"$version: getConf") {
+      assert("success" === client.getConf("test", null))
+    }
+
+    test(s"$version: setOut") {
+      client.setOut(new PrintStream(new ByteArrayOutputStream()))
+    }
+
+    test(s"$version: setInfo") {
+      client.setInfo(new PrintStream(new ByteArrayOutputStream()))
+    }
+
+    test(s"$version: setError") {
+      client.setError(new PrintStream(new ByteArrayOutputStream()))
+    }
+
+    test(s"$version: newSession") {
+      val newClient = client.newSession()
+      assert(newClient != null)
+    }
+
+    test(s"$version: withHiveState and addJar") {
+      val newClassPath = "."
+      client.addJar(newClassPath)
+      client.withHiveState {
+        // No exception should be thrown.
+        // withHiveState changes the classloader to MutableURLClassLoader
+        val classLoader = Thread.currentThread().getContextClassLoader
+          .asInstanceOf[MutableURLClassLoader]
+
+        val urls = classLoader.getURLs()
+        urls.contains(new File(newClassPath).toURI.toURL)
+      }
+    }
+
+    test(s"$version: reset") {
+      // Clears all database, tables, functions...
       client.reset()
+      assert(client.listTables("default").isEmpty)
     }
   }
 }