From 5faba9faccb5ce43790c43284769e0f890340606 Mon Sep 17 00:00:00 2001 From: Ryan Blue <blue@apache.org> Date: Wed, 16 Mar 2016 22:57:06 -0700 Subject: [PATCH] [SPARK-13403][SQL] Pass hadoopConfiguration to HiveConf constructors. This commit updates the HiveContext so that sc.hadoopConfiguration is used to instantiate its internal instances of HiveConf. I tested this by overriding the S3 FileSystem implementation from spark-defaults.conf as "spark.hadoop.fs.s3.impl" (to avoid [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810)). Author: Ryan Blue <blue@apache.org> Closes #11273 from rdblue/SPARK-13403-new-hive-conf-from-hadoop-conf. --- .../org/apache/spark/sql/hive/HiveContext.scala | 6 +++++- .../spark/sql/hive/client/HiveClientImpl.scala | 4 +++- .../sql/hive/client/IsolatedClientLoader.scala | 8 ++++++-- .../spark/sql/hive/HiveCatalogSuite.scala | 4 +++- .../spark/sql/hive/client/VersionsSuite.scala | 17 +++++++++++++++++ 5 files changed, 34 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 05fc569588..4238ad1ad4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -210,6 +210,7 @@ class HiveContext private[hive]( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), sparkConf = sc.conf, execJars = Seq(), + hadoopConf = sc.hadoopConfiguration, config = newTemporaryConfiguration(useInMemoryDerby = true), isolationOn = false, baseClassLoader = Utils.getContextOrSparkClassLoader) @@ -239,7 +240,7 @@ class HiveContext private[hive]( // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options // into the isolated client loader - val metadataConf = new HiveConf() + val metadataConf = new HiveConf(sc.hadoopConfiguration, classOf[HiveConf]) val defaultWarehouseLocation = metadataConf.get("hive.metastore.warehouse.dir") logInfo("default warehouse location is " + defaultWarehouseLocation) @@ -279,6 +280,7 @@ class HiveContext private[hive]( version = metaVersion, sparkConf = sc.conf, execJars = jars.toSeq, + hadoopConf = sc.hadoopConfiguration, config = allConfig, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, @@ -291,6 +293,7 @@ class HiveContext private[hive]( hiveMetastoreVersion = hiveMetastoreVersion, hadoopVersion = VersionInfo.getVersion, sparkConf = sc.conf, + hadoopConf = sc.hadoopConfiguration, config = allConfig, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) @@ -320,6 +323,7 @@ class HiveContext private[hive]( version = metaVersion, sparkConf = sc.conf, execJars = jars.toSeq, + hadoopConf = sc.hadoopConfiguration, config = allConfig, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3040ec93f8..a5f0bbf678 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -22,6 +22,7 @@ import java.io.{File, PrintStream} import scala.collection.JavaConverters._ import scala.language.reflectiveCalls +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.cli.CliSessionState import org.apache.hadoop.hive.conf.HiveConf @@ -62,6 +63,7 @@ import org.apache.spark.util.{CircularBuffer, Utils} private[hive] class HiveClientImpl( override val version: HiveVersion, sparkConf: SparkConf, + hadoopConf: Configuration, config: Map[String, String], initClassLoader: ClassLoader, val clientLoader: IsolatedClientLoader) @@ -115,7 +117,7 @@ private[hive] class HiveClientImpl( // so we should keep `conf` and reuse the existing instance of `CliSessionState`. originalState } else { - val initialConf = new HiveConf(classOf[SessionState]) + val initialConf = new HiveConf(hadoopConf, classOf[SessionState]) // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader // (i.e. initClassLoader at here). diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 024f4dfeba..932402a5f3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -26,6 +26,7 @@ import scala.language.reflectiveCalls import scala.util.Try import org.apache.commons.io.{FileUtils, IOUtils} +import org.apache.hadoop.conf.Configuration import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.SparkSubmitUtils @@ -42,6 +43,7 @@ private[hive] object IsolatedClientLoader extends Logging { hiveMetastoreVersion: String, hadoopVersion: String, sparkConf: SparkConf, + hadoopConf: Configuration, config: Map[String, String] = Map.empty, ivyPath: Option[String] = None, sharedPrefixes: Seq[String] = Seq.empty, @@ -79,6 +81,7 @@ private[hive] object IsolatedClientLoader extends Logging { hiveVersion(hiveMetastoreVersion), sparkConf, execJars = files, + hadoopConf = hadoopConf, config = config, sharesHadoopClasses = sharesHadoopClasses, sharedPrefixes = sharedPrefixes, @@ -149,6 +152,7 @@ private[hive] object IsolatedClientLoader extends Logging { private[hive] class IsolatedClientLoader( val version: HiveVersion, val sparkConf: SparkConf, + val hadoopConf: Configuration, val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, @@ -238,7 +242,7 @@ private[hive] class IsolatedClientLoader( /** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = { if (!isolationOn) { - return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this) + return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -249,7 +253,7 @@ private[hive] class IsolatedClientLoader( classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head - .newInstance(version, sparkConf, config, classLoader, this) + .newInstance(version, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala index 0dc4fea22d..427f5747a0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.VersionInfo import org.apache.spark.SparkConf @@ -33,7 +34,8 @@ class HiveCatalogSuite extends CatalogTestCases { IsolatedClientLoader.forVersion( hiveMetastoreVersion = HiveContext.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, - sparkConf = new SparkConf()).createClient() + sparkConf = new SparkConf(), + hadoopConf = new Configuration()).createClient() } protected override val utils: CatalogTestUtils = new CatalogTestUtils { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 3d54da11ad..f218ab80a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client import java.io.File +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkConf, SparkFunSuite} @@ -63,12 +64,26 @@ class VersionsSuite extends SparkFunSuite with Logging { hiveMetastoreVersion = HiveContext.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, + hadoopConf = new Configuration(), config = buildConf(), ivyPath = ivyPath).createClient() val db = new CatalogDatabase("default", "desc", "loc", Map()) badClient.createDatabase(db, ignoreIfExists = true) } + test("hadoop configuration preserved") { + val hadoopConf = new Configuration(); + hadoopConf.set("test", "success") + val client = IsolatedClientLoader.forVersion( + hiveMetastoreVersion = HiveContext.hiveExecutionVersion, + hadoopVersion = VersionInfo.getVersion, + sparkConf = sparkConf, + hadoopConf = hadoopConf, + config = buildConf(), + ivyPath = ivyPath).createClient() + assert("success" === client.getConf("test", null)) + } + private def getNestedMessages(e: Throwable): String = { var causes = "" var lastException = e @@ -98,6 +113,7 @@ class VersionsSuite extends SparkFunSuite with Logging { hiveMetastoreVersion = "13", hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, + hadoopConf = new Configuration(), config = buildConf(), ivyPath = ivyPath).createClient() } @@ -118,6 +134,7 @@ class VersionsSuite extends SparkFunSuite with Logging { hiveMetastoreVersion = version, hadoopVersion = VersionInfo.getVersion, sparkConf = sparkConf, + hadoopConf = new Configuration(), config = buildConf(), ivyPath = ivyPath).createClient() } -- GitLab