diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index dc372be0e5a3761785c2568fbc3bbd1280e0cd6d..211a3b879c1b34a331f2f28d3cf262e030839a1b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.ql.{Driver, metadata} +import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} +import org.apache.hadoop.util.VersionInfo import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.Expression @@ -62,6 +64,52 @@ private[hive] class ClientWrapper( extends ClientInterface with Logging { + overrideHadoopShims() + + // !! HACK ALERT !! + // + // This method is a surgical fix for Hadoop version 2.0.0-mr1-cdh4.1.1, which is used by Spark EC2 + // scripts. We should remove this after upgrading Spark EC2 scripts to some more recent Hadoop + // version in the future. + // + // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking + // version information gathered from Hadoop jar files. If the major version number is 1, + // `Hadoop20SShims` will be loaded. Otherwise, if the major version number is 2, `Hadoop23Shims` + // will be chosen. + // + // However, part of APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical + // reasons. So 2.0.0-mr1-cdh4.1.1 is actually more Hadoop-1-like and should be used together with + // `Hadoop20SShims`, but `Hadoop20SShims` is chose because the major version number here is 2. + // + // Here we check for this specific version and loads `Hadoop20SShims` via reflection. Note that + // we can't check for string literal "2.0.0-mr1-cdh4.1.1" because the obtained version string + // comes from Maven artifact org.apache.hadoop:hadoop-common:2.0.0-cdh4.1.1, which doesn't have + // the "mr1" tag in its version string. + private def overrideHadoopShims(): Unit = { + val VersionPattern = """2\.0\.0.*cdh4.*""".r + + VersionInfo.getVersion match { + case VersionPattern() => + val shimClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims" + logInfo(s"Loading Hadoop shims $shimClassName") + + try { + val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims") + // scalastyle:off classforname + val shimsClass = Class.forName(shimClassName) + // scalastyle:on classforname + val shims = classOf[HadoopShims].cast(shimsClass.newInstance()) + shimsField.setAccessible(true) + shimsField.set(null, shims) + } catch { case cause: Throwable => + logError(s"Failed to load $shimClassName") + // Falls back to normal Hive `ShimLoader` logic + } + + case _ => + } + } + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer()