From 9f94c85ff35df6289371f80edde51c2aa6c4bcdc Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Thu, 6 Aug 2015 09:53:53 -0700
Subject: [PATCH] [SPARK-9593] [SQL] [HOTFIX] Makes the Hadoop shims loading
 fix more robust

This is a follow-up of #7929.

We found that Jenkins SBT master build still fails because of the Hadoop shims loading issue. But the failure doesn't appear to be deterministic. My suspect is that Hadoop `VersionInfo` class may fail to inspect Hadoop version, and the shims loading branch is skipped.

This PR tries to make the fix more robust:

1. When Hadoop version is available, we load `Hadoop20SShims` for versions <= 2.0.x as srowen suggested in PR #7929.
2. Otherwise, we use `Path.getPathWithoutSchemeAndAuthority` as a probe method, which doesn't exist in Hadoop 1.x or 2.0.x. If this method is not found, `Hadoop20SShims` is also loaded.

Author: Cheng Lian <lian@databricks.com>

Closes #7994 from liancheng/spark-9593/fix-hadoop-shims and squashes the following commits:

e1d3d70 [Cheng Lian] Fixes typo in comments
8d971da [Cheng Lian] Makes the Hadoop shims loading fix more robust
---
 .../spark/sql/hive/client/ClientWrapper.scala | 88 ++++++++++++-------
 1 file changed, 55 insertions(+), 33 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 211a3b879c..3d05b583cf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -68,45 +68,67 @@ private[hive] class ClientWrapper(
 
   // !! HACK ALERT !!
   //
-  // This method is a surgical fix for Hadoop version 2.0.0-mr1-cdh4.1.1, which is used by Spark EC2
-  // scripts.  We should remove this after upgrading Spark EC2 scripts to some more recent Hadoop
-  // version in the future.
-  //
   // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
-  // version information gathered from Hadoop jar files.  If the major version number is 1,
-  // `Hadoop20SShims` will be loaded.  Otherwise, if the major version number is 2, `Hadoop23Shims`
-  // will be chosen.
+  // major version number gathered from Hadoop jar files:
+  //
+  // - For major version number 1, load `Hadoop20SShims`, where "20S" stands for Hadoop 0.20 with
+  //   security.
+  // - For major version number 2, load `Hadoop23Shims`, where "23" stands for Hadoop 0.23.
   //
-  // However, part of APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical
-  // reasons. So 2.0.0-mr1-cdh4.1.1 is actually more Hadoop-1-like and should be used together with
-  // `Hadoop20SShims`, but `Hadoop20SShims` is chose because the major version number here is 2.
+  // However, APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical reasons. It
+  // turns out that Hadoop 2.0.x versions should also be used together with `Hadoop20SShims`, but
+  // `Hadoop23Shims` is chosen because the major version number here is 2.
   //
-  // Here we check for this specific version and loads `Hadoop20SShims` via reflection.  Note that
-  // we can't check for string literal "2.0.0-mr1-cdh4.1.1" because the obtained version string
-  // comes from Maven artifact org.apache.hadoop:hadoop-common:2.0.0-cdh4.1.1, which doesn't have
-  // the "mr1" tag in its version string.
+  // To fix this issue, we try to inspect Hadoop version via `org.apache.hadoop.utils.VersionInfo`
+  // and load `Hadoop20SShims` for Hadoop 1.x and 2.0.x versions.  If Hadoop version information is
+  // not available, we decide whether to override the shims or not by checking for existence of a
+  // probe method which doesn't exist in Hadoop 1.x or 2.0.x versions.
   private def overrideHadoopShims(): Unit = {
-    val VersionPattern = """2\.0\.0.*cdh4.*""".r
-
-    VersionInfo.getVersion match {
-      case VersionPattern() =>
-        val shimClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
-        logInfo(s"Loading Hadoop shims $shimClassName")
-
-        try {
-          val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
-          // scalastyle:off classforname
-          val shimsClass = Class.forName(shimClassName)
-          // scalastyle:on classforname
-          val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
-          shimsField.setAccessible(true)
-          shimsField.set(null, shims)
-        } catch { case cause: Throwable =>
-          logError(s"Failed to load $shimClassName")
-          // Falls back to normal Hive `ShimLoader` logic
+    val hadoopVersion = VersionInfo.getVersion
+    val VersionPattern = """(\d+)\.(\d+).*""".r
+
+    hadoopVersion match {
+      case null =>
+        logError("Failed to inspect Hadoop version")
+
+        // Using "Path.getPathWithoutSchemeAndAuthority" as the probe method.
+        val probeMethod = "getPathWithoutSchemeAndAuthority"
+        if (!classOf[Path].getDeclaredMethods.exists(_.getName == probeMethod)) {
+          logInfo(
+            s"Method ${classOf[Path].getCanonicalName}.$probeMethod not found, " +
+              s"we are probably using Hadoop 1.x or 2.0.x")
+          loadHadoop20SShims()
+        }
+
+      case VersionPattern(majorVersion, minorVersion) =>
+        logInfo(s"Inspected Hadoop version: $hadoopVersion")
+
+        // Loads Hadoop20SShims for 1.x and 2.0.x versions
+        val (major, minor) = (majorVersion.toInt, minorVersion.toInt)
+        if (major < 2 || (major == 2 && minor == 0)) {
+          loadHadoop20SShims()
         }
+    }
+
+    // Logs the actual loaded Hadoop shims class
+    val loadedShimsClassName = ShimLoader.getHadoopShims.getClass.getCanonicalName
+    logInfo(s"Loaded $loadedShimsClassName for Hadoop version $hadoopVersion")
+  }
 
-      case _ =>
+  private def loadHadoop20SShims(): Unit = {
+    val hadoop20SShimsClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
+    logInfo(s"Loading Hadoop shims $hadoop20SShimsClassName")
+
+    try {
+      val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
+      // scalastyle:off classforname
+      val shimsClass = Class.forName(hadoop20SShimsClassName)
+      // scalastyle:on classforname
+      val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
+      shimsField.setAccessible(true)
+      shimsField.set(null, shims)
+    } catch { case cause: Throwable =>
+      throw new RuntimeException(s"Failed to load $hadoop20SShimsClassName", cause)
     }
   }
 
-- 
GitLab