From 40d3c6797a3dfd037eb69b2bcd336d8544deddf5 Mon Sep 17 00:00:00 2001
From: Steve Loughran <stevel@hortonworks.com>
Date: Sat, 31 Oct 2015 18:23:15 -0700
Subject: [PATCH] [SPARK-11265][YARN] YarnClient can't get tokens to talk to
 Hive 1.2.1 in a secure cluster

This is a fix for SPARK-11265; the introspection code to get Hive delegation tokens failing on Spark 1.5.1+, due to changes in the Hive codebase

Author: Steve Loughran <stevel@hortonworks.com>

Closes #9232 from steveloughran/stevel/patches/SPARK-11265-hive-tokens.
---
 yarn/pom.xml                                  | 25 +++++++
 .../org/apache/spark/deploy/yarn/Client.scala | 51 +------------
 .../deploy/yarn/YarnSparkHadoopUtil.scala     | 73 +++++++++++++++++++
 .../yarn/YarnSparkHadoopUtilSuite.scala       | 29 ++++++++
 4 files changed, 129 insertions(+), 49 deletions(-)

diff --git a/yarn/pom.xml b/yarn/pom.xml
index 3eadacba13..989b820bec 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -162,6 +162,31 @@
        <artifactId>jersey-server</artifactId>
        <scope>test</scope>
      </dependency>
+
+    <!--
+     Testing Hive reflection needs hive on the test classpath only.
+     It doesn't need the spark hive modules, so the -Phive flag is not checked.
+      -->
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-exec</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-metastore</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libthrift</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.thrift</groupId>
+      <artifactId>libfb303</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4954b61809..a3f33d8018 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1337,55 +1337,8 @@ object Client extends Logging {
       conf: Configuration,
       credentials: Credentials) {
     if (shouldGetTokens(sparkConf, "hive") && UserGroupInformation.isSecurityEnabled) {
-      val mirror = universe.runtimeMirror(getClass.getClassLoader)
-
-      try {
-        val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
-        val hiveConf = hiveConfClass.newInstance()
-
-        val hiveConfGet = (param: String) => Option(hiveConfClass
-          .getMethod("get", classOf[java.lang.String])
-          .invoke(hiveConf, param))
-
-        val metastore_uri = hiveConfGet("hive.metastore.uris")
-
-        // Check for local metastore
-        if (metastore_uri != None && metastore_uri.get.toString.size > 0) {
-          val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
-          val hive = hiveClass.getMethod("get").invoke(null, hiveConf.asInstanceOf[Object])
-
-          val metastore_kerberos_principal_conf_var = mirror.classLoader
-            .loadClass("org.apache.hadoop.hive.conf.HiveConf$ConfVars")
-            .getField("METASTORE_KERBEROS_PRINCIPAL").get("varname").toString
-
-          val principal = hiveConfGet(metastore_kerberos_principal_conf_var)
-
-          val username = Option(UserGroupInformation.getCurrentUser().getUserName)
-          if (principal != None && username != None) {
-            val tokenStr = hiveClass.getMethod("getDelegationToken",
-              classOf[java.lang.String], classOf[java.lang.String])
-              .invoke(hive, username.get, principal.get).asInstanceOf[java.lang.String]
-
-            val hive2Token = new Token[DelegationTokenIdentifier]()
-            hive2Token.decodeFromUrlString(tokenStr)
-            credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token)
-            logDebug("Added hive.Server2.delegation.token to conf.")
-            hiveClass.getMethod("closeCurrent").invoke(null)
-          } else {
-            logError("Username or principal == NULL")
-            logError(s"""username=${username.getOrElse("(NULL)")}""")
-            logError(s"""principal=${principal.getOrElse("(NULL)")}""")
-            throw new IllegalArgumentException("username and/or principal is equal to null!")
-          }
-        } else {
-          logDebug("HiveMetaStore configured in localmode")
-        }
-      } catch {
-        case e: java.lang.NoSuchMethodException => { logInfo("Hive Method not found " + e); return }
-        case e: java.lang.ClassNotFoundException => { logInfo("Hive Class not found " + e); return }
-        case e: Exception => { logError("Unexpected Exception " + e)
-          throw new RuntimeException("Unexpected exception", e)
-        }
+      YarnSparkHadoopUtil.get.obtainTokenForHiveMetastore(conf).foreach {
+        credentials.addToken(new Text("hive.server2.delegation.token"), _)
       }
     }
   }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index f276e7efde..5924daf3ec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -22,14 +22,17 @@ import java.util.regex.Matcher
 import java.util.regex.Pattern
 
 import scala.collection.mutable.HashMap
+import scala.reflect.runtime._
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.{Master, JobConf}
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.Token
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -142,6 +145,76 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
     ConverterUtils.toContainerId(containerIdString)
   }
+
+  /**
+   * Obtains token for the Hive metastore, using the current user as the principal.
+   * Some exceptions are caught and downgraded to a log message.
+   * @param conf hadoop configuration; the Hive configuration will be based on this
+   * @return a token, or `None` if there's no need for a token (no metastore URI or principal
+   *         in the config), or if a binding exception was caught and downgraded.
+   */
+  def obtainTokenForHiveMetastore(conf: Configuration): Option[Token[DelegationTokenIdentifier]] = {
+    try {
+      obtainTokenForHiveMetastoreInner(conf, UserGroupInformation.getCurrentUser().getUserName)
+    } catch {
+      case e: ClassNotFoundException =>
+        logInfo(s"Hive class not found $e")
+        logDebug("Hive class not found", e)
+        None
+    }
+  }
+
+  /**
+   * Inner routine to obtains token for the Hive metastore; exceptions are raised on any problem.
+   * @param conf hadoop configuration; the Hive configuration will be based on this.
+   * @param username the username of the principal requesting the delegating token.
+   * @return a delegation token
+   */
+  private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
+      username: String): Option[Token[DelegationTokenIdentifier]] = {
+    val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+
+    // the hive configuration class is a subclass of Hadoop Configuration, so can be cast down
+    // to a Configuration and used without reflection
+    val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+    // using the (Configuration, Class) constructor allows the current configuratin to be included
+    // in the hive config.
+    val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
+      classOf[Object].getClass)
+    val hiveConf = ctor.newInstance(conf, hiveConfClass).asInstanceOf[Configuration]
+    val metastoreUri = hiveConf.getTrimmed("hive.metastore.uris", "")
+
+    // Check for local metastore
+    if (metastoreUri.nonEmpty) {
+      require(username.nonEmpty, "Username undefined")
+      val principalKey = "hive.metastore.kerberos.principal"
+      val principal = hiveConf.getTrimmed(principalKey, "")
+      require(principal.nonEmpty, "Hive principal $principalKey undefined")
+      logDebug(s"Getting Hive delegation token for $username against $principal at $metastoreUri")
+      val hiveClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+      val closeCurrent = hiveClass.getMethod("closeCurrent")
+      try {
+        // get all the instance methods before invoking any
+        val getDelegationToken = hiveClass.getMethod("getDelegationToken",
+          classOf[String], classOf[String])
+        val getHive = hiveClass.getMethod("get", hiveConfClass)
+
+        // invoke
+        val hive = getHive.invoke(null, hiveConf)
+        val tokenStr = getDelegationToken.invoke(hive, username, principal).asInstanceOf[String]
+        val hive2Token = new Token[DelegationTokenIdentifier]()
+        hive2Token.decodeFromUrlString(tokenStr)
+        Some(hive2Token)
+      } finally {
+        Utils.tryLogNonFatalError {
+          closeCurrent.invoke(null)
+        }
+      }
+    } else {
+      logDebug("HiveMetaStore configured in localmode")
+      None
+    }
+  }
 }
 
 object YarnSparkHadoopUtil {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index e1c67db765..9132c56a91 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.{File, IOException}
+import java.lang.reflect.InvocationTargetException
 
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.metadata.HiveException
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -245,4 +247,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
       System.clearProperty("SPARK_YARN_MODE")
     }
   }
+
+  test("Obtain tokens For HiveMetastore") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+    // thrift picks up on port 0 and bails out, without trying to talk to endpoint
+    hadoopConf.set("hive.metastore.uris", "http://localhost:0")
+    val util = new YarnSparkHadoopUtil
+    assertNestedHiveException(intercept[InvocationTargetException] {
+      util.obtainTokenForHiveMetastoreInner(hadoopConf, "alice")
+    })
+    // expect exception trapping code to unwind this hive-side exception
+    assertNestedHiveException(intercept[InvocationTargetException] {
+      util.obtainTokenForHiveMetastore(hadoopConf)
+    })
+  }
+
+  def assertNestedHiveException(e: InvocationTargetException): Throwable = {
+    val inner = e.getCause
+    if (inner == null) {
+      fail("No inner cause", e)
+    }
+    if (!inner.isInstanceOf[HiveException]) {
+      fail("Not a hive exception", inner)
+    }
+    inner
+  }
+
 }
-- 
GitLab