diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 8a4264194ae8d8ae171529ee8f22ab11d26f6cad..e83941c2ecf66a584c3f82fe5bd1ed9afaecc4ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution
 import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
+import org.apache.hadoop.util.VersionInfo
 
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SQLConf.SQLConfEntry
@@ -288,7 +289,8 @@ class HiveContext private[hive](
       logInfo(
         s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
       IsolatedClientLoader.forVersion(
-        version = hiveMetastoreVersion,
+        hiveMetastoreVersion = hiveMetastoreVersion,
+        hadoopVersion = VersionInfo.getVersion,
         config = allConfig,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e041e0d8e5ae83c893583987497a5945e8f0737b..010051d255fdc37f6b2e932e9c918190f56addf5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -34,23 +34,51 @@ import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
 /** Factory for `IsolatedClientLoader` with specific versions of hive. */
-private[hive] object IsolatedClientLoader {
+private[hive] object IsolatedClientLoader extends Logging {
   /**
    * Creates isolated Hive client loaders by downloading the requested version from maven.
    */
   def forVersion(
-      version: String,
+      hiveMetastoreVersion: String,
+      hadoopVersion: String,
       config: Map[String, String] = Map.empty,
       ivyPath: Option[String] = None,
       sharedPrefixes: Seq[String] = Seq.empty,
       barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
-    val resolvedVersion = hiveVersion(version)
-    val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
-      downloadVersion(resolvedVersion, ivyPath))
+    val resolvedVersion = hiveVersion(hiveMetastoreVersion)
+    // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
+    // with the given version, we will use Hadoop 2.4.0 and then will not share Hadoop classes.
+    var sharesHadoopClasses = true
+    val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
+      resolvedVersions((resolvedVersion, hadoopVersion))
+    } else {
+      val (downloadedFiles, actualHadoopVersion) =
+        try {
+          (downloadVersion(resolvedVersion, hadoopVersion, ivyPath), hadoopVersion)
+        } catch {
+          case e: RuntimeException if e.getMessage.contains("hadoop") =>
+            // If the error message contains hadoop, it is probably because the hadoop
+            // version cannot be resolved (e.g. it is a vendor specific version like
+            // 2.0.0-cdh4.1.1). If it is the case, we will try just
+            // "org.apache.hadoop:hadoop-client:2.4.0". "org.apache.hadoop:hadoop-client:2.4.0"
+            // is used just because we used to hard code it as the hadoop artifact to download.
+            logWarning(s"Failed to resolve Hadoop artifacts for the version ${hadoopVersion}. " +
+              s"We will change the hadoop version from ${hadoopVersion} to 2.4.0 and try again. " +
+              "Hadoop classes will not be shared between Spark and Hive metastore client. " +
+              "It is recommended to set jars used by Hive metastore client through " +
+              "spark.sql.hive.metastore.jars in the production environment.")
+            sharesHadoopClasses = false
+            (downloadVersion(resolvedVersion, "2.4.0", ivyPath), "2.4.0")
+        }
+      resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
+      resolvedVersions((resolvedVersion, actualHadoopVersion))
+    }
+
     new IsolatedClientLoader(
-      version = hiveVersion(version),
+      version = hiveVersion(hiveMetastoreVersion),
       execJars = files,
       config = config,
+      sharesHadoopClasses = sharesHadoopClasses,
       sharedPrefixes = sharedPrefixes,
       barrierPrefixes = barrierPrefixes)
   }
@@ -64,12 +92,15 @@ private[hive] object IsolatedClientLoader {
     case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
   }
 
-  private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = {
+  private def downloadVersion(
+      version: HiveVersion,
+      hadoopVersion: String,
+      ivyPath: Option[String]): Seq[URL] = {
     val hiveArtifacts = version.extraDeps ++
       Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
         .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
       Seq("com.google.guava:guava:14.0.1",
-        "org.apache.hadoop:hadoop-client:2.4.0")
+        s"org.apache.hadoop:hadoop-client:$hadoopVersion")
 
     val classpath = quietly {
       SparkSubmitUtils.resolveMavenCoordinates(
@@ -86,7 +117,10 @@ private[hive] object IsolatedClientLoader {
     tempDir.listFiles().map(_.toURI.toURL)
   }
 
-  private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]]
+  // A map from a given pair of HiveVersion and Hadoop version to jar files.
+  // It is only used by forVersion.
+  private val resolvedVersions =
+    new scala.collection.mutable.HashMap[(HiveVersion, String), Seq[URL]]
 }
 
 /**
@@ -106,6 +140,7 @@ private[hive] object IsolatedClientLoader {
  * @param config   A set of options that will be added to the HiveConf of the constructed client.
  * @param isolationOn When true, custom versions of barrier classes will be constructed.  Must be
  *                    true unless loading the version of hive that is on Sparks classloader.
+ * @param sharesHadoopClasses When true, we will share Hadoop classes between Spark and
  * @param rootClassLoader The system root classloader. Must not know about Hive classes.
  * @param baseClassLoader The spark classloader that is used to load shared classes.
  */
@@ -114,6 +149,7 @@ private[hive] class IsolatedClientLoader(
     val execJars: Seq[URL] = Seq.empty,
     val config: Map[String, String] = Map.empty,
     val isolationOn: Boolean = true,
+    val sharesHadoopClasses: Boolean = true,
     val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
     val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
     val sharedPrefixes: Seq[String] = Seq.empty,
@@ -126,16 +162,20 @@ private[hive] class IsolatedClientLoader(
   /** All jars used by the hive specific classloader. */
   protected def allJars = execJars.toArray
 
-  protected def isSharedClass(name: String): Boolean =
+  protected def isSharedClass(name: String): Boolean = {
+    val isHadoopClass =
+      name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")
+
     name.contains("slf4j") ||
     name.contains("log4j") ||
     name.startsWith("org.apache.spark.") ||
-    (name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.")) ||
+    (sharesHadoopClasses && isHadoopClass) ||
     name.startsWith("scala.") ||
     (name.startsWith("com.google") && !name.startsWith("com.google.cloud")) ||
     name.startsWith("java.lang.") ||
     name.startsWith("java.net") ||
     sharedPrefixes.exists(name.startsWith)
+  }
 
   /** True if `name` refers to a spark class that must see specific version of Hive. */
   protected def isBarrierClass(name: String): Boolean =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 7bc13bc60d30e8763ac2b020a3cb9898df357be4..502b240f3650f58d4bb0f54ad9f8d50d426d116f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.client
 
 import java.io.File
 
+import org.apache.hadoop.util.VersionInfo
+
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{Logging, SparkFunSuite}
 import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
@@ -53,9 +55,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
   }
 
   test("success sanity check") {
-    val badClient = IsolatedClientLoader.forVersion(HiveContext.hiveExecutionVersion,
-      buildConf(),
-      ivyPath).createClient()
+    val badClient = IsolatedClientLoader.forVersion(
+      hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
+      hadoopVersion = VersionInfo.getVersion,
+      config = buildConf(),
+      ivyPath = ivyPath).createClient()
     val db = new HiveDatabase("default", "")
     badClient.createDatabase(db)
   }
@@ -85,7 +89,11 @@ class VersionsSuite extends SparkFunSuite with Logging {
   ignore("failure sanity check") {
     val e = intercept[Throwable] {
       val badClient = quietly {
-        IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).createClient()
+        IsolatedClientLoader.forVersion(
+          hiveMetastoreVersion = "13",
+          hadoopVersion = VersionInfo.getVersion,
+          config = buildConf(),
+          ivyPath = ivyPath).createClient()
       }
     }
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
@@ -99,7 +107,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
     test(s"$version: create client") {
       client = null
       System.gc() // Hack to avoid SEGV on some JVM versions.
-      client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).createClient()
+      client =
+        IsolatedClientLoader.forVersion(
+          hiveMetastoreVersion = version,
+          hadoopVersion = VersionInfo.getVersion,
+          config = buildConf(),
+          ivyPath = ivyPath).createClient()
     }
 
     test(s"$version: createDatabase") {