diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 211a3b879c1b34a331f2f28d3cf262e030839a1b..3d05b583cf9e0d1391bef5f60a1d811e3e8f225d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -68,45 +68,67 @@ private[hive] class ClientWrapper(
 
   // !! HACK ALERT !!
   //
-  // This method is a surgical fix for Hadoop version 2.0.0-mr1-cdh4.1.1, which is used by Spark EC2
-  // scripts.  We should remove this after upgrading Spark EC2 scripts to some more recent Hadoop
-  // version in the future.
-  //
   // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
-  // version information gathered from Hadoop jar files.  If the major version number is 1,
-  // `Hadoop20SShims` will be loaded.  Otherwise, if the major version number is 2, `Hadoop23Shims`
-  // will be chosen.
+  // major version number gathered from Hadoop jar files:
+  //
+  // - For major version number 1, load `Hadoop20SShims`, where "20S" stands for Hadoop 0.20 with
+  //   security.
+  // - For major version number 2, load `Hadoop23Shims`, where "23" stands for Hadoop 0.23.
   //
-  // However, part of APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical
-  // reasons. So 2.0.0-mr1-cdh4.1.1 is actually more Hadoop-1-like and should be used together with
-  // `Hadoop20SShims`, but `Hadoop20SShims` is chose because the major version number here is 2.
+  // However, APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical reasons. It
+  // turns out that Hadoop 2.0.x versions should also be used together with `Hadoop20SShims`, but
+  // `Hadoop23Shims` is chosen because the major version number here is 2.
   //
-  // Here we check for this specific version and loads `Hadoop20SShims` via reflection.  Note that
-  // we can't check for string literal "2.0.0-mr1-cdh4.1.1" because the obtained version string
-  // comes from Maven artifact org.apache.hadoop:hadoop-common:2.0.0-cdh4.1.1, which doesn't have
-  // the "mr1" tag in its version string.
+  // To fix this issue, we try to inspect Hadoop version via `org.apache.hadoop.utils.VersionInfo`
+  // and load `Hadoop20SShims` for Hadoop 1.x and 2.0.x versions.  If Hadoop version information is
+  // not available, we decide whether to override the shims or not by checking for existence of a
+  // probe method which doesn't exist in Hadoop 1.x or 2.0.x versions.
   private def overrideHadoopShims(): Unit = {
-    val VersionPattern = """2\.0\.0.*cdh4.*""".r
-
-    VersionInfo.getVersion match {
-      case VersionPattern() =>
-        val shimClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
-        logInfo(s"Loading Hadoop shims $shimClassName")
-
-        try {
-          val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
-          // scalastyle:off classforname
-          val shimsClass = Class.forName(shimClassName)
-          // scalastyle:on classforname
-          val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
-          shimsField.setAccessible(true)
-          shimsField.set(null, shims)
-        } catch { case cause: Throwable =>
-          logError(s"Failed to load $shimClassName")
-          // Falls back to normal Hive `ShimLoader` logic
+    val hadoopVersion = VersionInfo.getVersion
+    val VersionPattern = """(\d+)\.(\d+).*""".r
+
+    hadoopVersion match {
+      case null =>
+        logError("Failed to inspect Hadoop version")
+
+        // Using "Path.getPathWithoutSchemeAndAuthority" as the probe method.
+        val probeMethod = "getPathWithoutSchemeAndAuthority"
+        if (!classOf[Path].getDeclaredMethods.exists(_.getName == probeMethod)) {
+          logInfo(
+            s"Method ${classOf[Path].getCanonicalName}.$probeMethod not found, " +
+              s"we are probably using Hadoop 1.x or 2.0.x")
+          loadHadoop20SShims()
+        }
+
+      case VersionPattern(majorVersion, minorVersion) =>
+        logInfo(s"Inspected Hadoop version: $hadoopVersion")
+
+        // Loads Hadoop20SShims for 1.x and 2.0.x versions
+        val (major, minor) = (majorVersion.toInt, minorVersion.toInt)
+        if (major < 2 || (major == 2 && minor == 0)) {
+          loadHadoop20SShims()
         }
+    }
+
+    // Logs the actual loaded Hadoop shims class
+    val loadedShimsClassName = ShimLoader.getHadoopShims.getClass.getCanonicalName
+    logInfo(s"Loaded $loadedShimsClassName for Hadoop version $hadoopVersion")
+  }
 
-      case _ =>
+  private def loadHadoop20SShims(): Unit = {
+    val hadoop20SShimsClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
+    logInfo(s"Loading Hadoop shims $hadoop20SShimsClassName")
+
+    try {
+      val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
+      // scalastyle:off classforname
+      val shimsClass = Class.forName(hadoop20SShimsClassName)
+      // scalastyle:on classforname
+      val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
+      shimsField.setAccessible(true)
+      shimsField.set(null, shims)
+    } catch { case cause: Throwable =>
+      throw new RuntimeException(s"Failed to load $hadoop20SShimsClassName", cause)
     }
   }