diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 0fcba65ca6129ed11fe60d46888dea8507266dea..982ed63874a5fa4995a6bc12705c4fa9bae182f3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -95,6 +95,7 @@ private[hive] class ClientWrapper( case hive.v14 => new Shim_v0_14() } + // Create an internal session state for this ClientWrapper. val state = { val original = Thread.currentThread().getContextClassLoader Thread.currentThread().setContextClassLoader(getClass.getClassLoader) @@ -131,8 +132,15 @@ private[hive] class ClientWrapper( */ private def withHiveState[A](f: => A): A = synchronized { val original = Thread.currentThread().getContextClassLoader + // This setContextClassLoader is used for Hive 0.12's metastore since Hive 0.12 will not + // internally override the context class loader of the current thread with the class loader + // associated with the HiveConf in `state`. Thread.currentThread().setContextClassLoader(getClass.getClassLoader) + // Set the thread local metastore client to the client associated with this ClientWrapper. Hive.set(client) + // Starting from Hive 0.13.0, setCurrentSessionState will use the classLoader associated + // with the HiveConf in `state` to override the context class loader of the current + // thread. shim.setCurrentSessionState(state) val ret = try f finally { Thread.currentThread().setContextClassLoader(original) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 195e5752c3ec0c56355ef77b56b801f83d2f8cfa..aad58bfa2e6e0cba6939c942c2d285abc750616d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -91,9 +91,15 @@ case class AddJar(path: String) extends RunnableCommand { val jarURL = new java.io.File(path).toURL val newClassLoader = new java.net.URLClassLoader(Array(jarURL), currentClassLoader) Thread.currentThread.setContextClassLoader(newClassLoader) - org.apache.hadoop.hive.ql.metadata.Hive.get().getConf().setClassLoader(newClassLoader) - - // Add jar to isolated hive classloader + // We need to explicitly set the class loader associated with the conf in executionHive's + // state because this class loader will be used as the context class loader of the current + // thread to execute any Hive command. + // We cannot use `org.apache.hadoop.hive.ql.metadata.Hive.get().getConf()` because Hive.get() + // returns the value of a thread local variable and its HiveConf may not be the HiveConf + // associated with `executionHive.state` (for example, HiveContext is created in one thread + // and then add jar is called from another thread). + hiveContext.executionHive.state.getConf.setClassLoader(newClassLoader) + // Add jar to isolated hive (metadataHive) class loader. hiveContext.runSqlHive(s"ADD JAR $path") // Add jar to executors diff --git a/sql/hive/src/test/resources/hive-contrib-0.13.1.jar b/sql/hive/src/test/resources/hive-contrib-0.13.1.jar new file mode 100644 index 0000000000000000000000000000000000000000..ce0740d9245a795b3ae8009df99292a877ce09ac Binary files /dev/null and b/sql/hive/src/test/resources/hive-contrib-0.13.1.jar differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f8908760cc8973f7f4ad88fb4c9a9a98ff2c62c6..984d97d27bf54d2a60eb55d32407bf17402d4990 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -934,4 +934,32 @@ class SQLQuerySuite extends QueryTest { sql("set hive.exec.dynamic.partition.mode=strict") } } + + test("Call add jar in a different thread (SPARK-8306)") { + @volatile var error: Option[Throwable] = None + val thread = new Thread { + override def run() { + // To make sure this test works, this jar should not be loaded in another place. + TestHive.sql( + s"ADD JAR ${TestHive.getHiveFile("hive-contrib-0.13.1.jar").getCanonicalPath()}") + try { + TestHive.sql( + """ + |CREATE TEMPORARY FUNCTION example_max + |AS 'org.apache.hadoop.hive.contrib.udaf.example.UDAFExampleMax' + """.stripMargin) + } catch { + case throwable: Throwable => + error = Some(throwable) + } + } + } + thread.start() + thread.join() + error match { + case Some(throwable) => + fail("CREATE TEMPORARY FUNCTION should not fail.", throwable) + case None => // OK + } + } }