diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 2f0feee0efa8c3d6f78cfe279a5a1e8f4b1f8893..23777f2fe35adb4405b1ae506e8d2816d8327579 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.IOException
+import java.lang.reflect.InvocationTargetException
 import java.net.URI
 import java.util
 
@@ -68,7 +69,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   // Exceptions thrown by the hive client that we would like to wrap
   private val clientExceptions = Set(
     classOf[HiveException].getCanonicalName,
-    classOf[TException].getCanonicalName)
+    classOf[TException].getCanonicalName,
+    classOf[InvocationTargetException].getCanonicalName)
 
   /**
    * Whether this is an exception thrown by the hive client that should be wrapped.
@@ -94,7 +96,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     try {
       body
     } catch {
-      case NonFatal(e) if isClientException(e) =>
+      case NonFatal(exception) if isClientException(exception) =>
+        val e = exception match {
+          // Since we are using shim, the exceptions thrown by the underlying method of
+          // Method.invoke() are wrapped by InvocationTargetException
+          case i: InvocationTargetException => i.getCause
+          case o => o
+        }
         throw new AnalysisException(
           e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index dd8e5c6da08c9dd6e362d6c9332ee07dee53487e..64be1ed96da6b003202f34658c41992911d77dbc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -749,12 +749,8 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       deleteData: Boolean,
       ignoreIfNotExists: Boolean,
       purge: Boolean): Unit = {
-    try {
-      dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
-        ignoreIfNotExists: JBoolean, purge: JBoolean)
-    } catch {
-      case e: InvocationTargetException => throw e.getCause()
-    }
+    dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
+      ignoreIfNotExists: JBoolean, purge: JBoolean)
   }
 
   override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
@@ -847,11 +843,7 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
     val dropOptions = dropOptionsClass.newInstance().asInstanceOf[Object]
     dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
     dropOptionsPurge.setBoolean(dropOptions, purge)
-    try {
-      dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
-    } catch {
-      case e: InvocationTargetException => throw e.getCause()
-    }
+    dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
   }
 
 }