From b83b502c4189c571bda776511c6f7541c6067aae Mon Sep 17 00:00:00 2001 From: Kent Yao <yaooqinn@hotmail.com> Date: Fri, 18 Aug 2017 00:24:45 +0800 Subject: [PATCH] [SPARK-21428] Turn IsolatedClientLoader off while using builtin Hive jars for reusing CliSessionState MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Set isolated to false while using builtin hive jars and `SessionState.get` returns a `CliSessionState` instance. ## How was this patch tested? 1 Unit Tests 2 Manually verified: `hive.exec.strachdir` was only created once because of reusing cliSessionState ```java ➜ spark git:(SPARK-21428) ✗ bin/spark-sql --conf spark.sql.hive.metastore.jars=builtin log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/07/16 23:59:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/07/16 23:59:27 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 17/07/16 23:59:27 INFO ObjectStore: ObjectStore, initialize called 17/07/16 23:59:28 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 17/07/16 23:59:28 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 17/07/16 23:59:29 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order" 17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:30 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:31 INFO MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY 17/07/16 23:59:31 INFO ObjectStore: Initialized ObjectStore 17/07/16 23:59:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/07/16 23:59:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/07/16 23:59:32 INFO HiveMetaStore: Added admin role in metastore 17/07/16 23:59:32 INFO HiveMetaStore: Added public role in metastore 17/07/16 23:59:32 INFO HiveMetaStore: No user is added in admin role, since config is empty 17/07/16 23:59:32 INFO HiveMetaStore: 0: get_all_databases 17/07/16 23:59:32 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_all_databases 17/07/16 23:59:32 INFO HiveMetaStore: 0: get_functions: db=default pat=* 17/07/16 23:59:32 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_functions: db=default pat=* 17/07/16 23:59:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table. 17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/beea7261-221a-4711-89e8-8b12a9d37370_resources 17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370 17/07/16 23:59:32 INFO SessionState: Created local directory: /var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/Kent/beea7261-221a-4711-89e8-8b12a9d37370 17/07/16 23:59:32 INFO SessionState: Created HDFS directory: /tmp/hive/Kent/beea7261-221a-4711-89e8-8b12a9d37370/_tmp_space.db 17/07/16 23:59:32 INFO SparkContext: Running Spark version 2.3.0-SNAPSHOT 17/07/16 23:59:32 INFO SparkContext: Submitted application: SparkSQL::10.0.0.8 17/07/16 23:59:32 INFO SecurityManager: Changing view acls to: Kent 17/07/16 23:59:32 INFO SecurityManager: Changing modify acls to: Kent 17/07/16 23:59:32 INFO SecurityManager: Changing view acls groups to: 17/07/16 23:59:32 INFO SecurityManager: Changing modify acls groups to: 17/07/16 23:59:32 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Kent); groups with view permissions: Set(); users with modify permissions: Set(Kent); groups with modify permissions: Set() 17/07/16 23:59:33 INFO Utils: Successfully started service 'sparkDriver' on port 51889. 17/07/16 23:59:33 INFO SparkEnv: Registering MapOutputTracker 17/07/16 23:59:33 INFO SparkEnv: Registering BlockManagerMaster 17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/07/16 23:59:33 INFO DiskBlockManager: Created local directory at /private/var/folders/k2/04p4k4ws73l6711h_mz2_tq00000gn/T/blockmgr-9cfae28a-01e9-4c73-a1f1-f76fa52fc7a5 17/07/16 23:59:33 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 17/07/16 23:59:33 INFO SparkEnv: Registering OutputCommitCoordinator 17/07/16 23:59:33 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/07/16 23:59:33 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.0.0.8:4040 17/07/16 23:59:33 INFO Executor: Starting executor ID driver on host localhost 17/07/16 23:59:33 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51890. 17/07/16 23:59:33 INFO NettyBlockTransferService: Server created on 10.0.0.8:51890 17/07/16 23:59:33 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/07/16 23:59:33 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:33 INFO BlockManagerMasterEndpoint: Registering block manager 10.0.0.8:51890 with 366.3 MB RAM, BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:33 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:33 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.0.0.8, 51890, None) 17/07/16 23:59:34 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/Kent/Documents/spark/spark-warehouse'). 17/07/16 23:59:34 INFO SharedState: Warehouse path is 'file:/Users/Kent/Documents/spark/spark-warehouse'. 17/07/16 23:59:34 INFO HiveUtils: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes. 17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse 17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: default 17/07/16 23:59:34 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_database: default 17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse 17/07/16 23:59:34 INFO HiveMetaStore: 0: get_database: global_temp 17/07/16 23:59:34 INFO audit: ugi=Kent ip=unknown-ip-addr cmd=get_database: global_temp 17/07/16 23:59:34 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException 17/07/16 23:59:34 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.2) is /user/hive/warehouse 17/07/16 23:59:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint spark-sql> ``` cc cloud-fan gatorsmile Author: Kent Yao <yaooqinn@hotmail.com> Author: hzyaoqin <hzyaoqin@corp.netease.com> Closes #18648 from yaooqinn/SPARK-21428. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 12 +- .../org/apache/spark/deploy/SparkSubmit.scala | 19 +-- .../HiveCliSessionStateSuite.scala | 64 +++++++ .../org/apache/spark/sql/hive/HiveUtils.scala | 19 ++- .../spark/sql/hive/client/HiveClient.scala | 6 + .../sql/hive/client/HiveClientImpl.scala | 158 ++++++++---------- 6 files changed, 170 insertions(+), 108 deletions(-) create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e26f61dd3e..2a92ef99b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.IOException +import java.io.{File, IOException} import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -155,8 +155,14 @@ class SparkHadoopUtil extends Logging { def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } - def loginUserFromKeytab(principalName: String, keytabFilename: String) { - UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) + def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = { + if (!new File(keytabFilename).exists()) { + throw new SparkException(s"Keytab file: ${keytabFilename} does not exist") + } else { + logInfo("Attempting to login to Kerberos" + + s" using principal: ${principalName} and keytab: ${keytabFilename}") + UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 019780076e..6d744a084a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -559,18 +559,13 @@ object SparkSubmit extends CommandLineUtils { if (clusterManager == YARN || clusterManager == LOCAL) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") - if (!new File(args.keytab).exists()) { - throw new SparkException(s"Keytab file: ${args.keytab} does not exist") - } else { - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) - - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - } + SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab) + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala new file mode 100644 index 0000000000..5f9ea4d267 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import org.apache.hadoop.hive.cli.CliSessionState +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.session.SessionState + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.hive.HiveUtils + +class HiveCliSessionStateSuite extends SparkFunSuite { + + def withSessionClear(f: () => Unit): Unit = { + try f finally SessionState.detachSession() + } + + test("CliSessionState will be reused") { + withSessionClear { () => + val hiveConf = new HiveConf(classOf[SessionState]) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { + case (key, value) => hiveConf.set(key, value) + } + val sessionState: SessionState = new CliSessionState(hiveConf) + SessionState.start(sessionState) + val s1 = SessionState.get + val sparkConf = new SparkConf() + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + val s2 = HiveUtils.newClientForMetadata(sparkConf, hadoopConf).getState + assert(s1 === s2) + assert(s2.isInstanceOf[CliSessionState]) + } + } + + test("SessionState will not be reused") { + withSessionClear { () => + val sparkConf = new SparkConf() + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { + case (key, value) => hadoopConf.set(key, value) + } + val hiveClient = HiveUtils.newClientForMetadata(sparkConf, hadoopConf) + val s1 = hiveClient.getState + val s2 = hiveClient.newSession().getState + assert(s1 !== s2) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 643925064d..561c127a40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo @@ -230,6 +231,22 @@ private[spark] object HiveUtils extends Logging { }.toMap } + /** + * Check current Thread's SessionState type + * @return true when SessionState.get returns an instance of CliSessionState, + * false when it gets non-CliSessionState instance or null + */ + def isCliSessionState(): Boolean = { + val state = SessionState.get + var temp: Class[_] = if (state != null) state.getClass else null + var found = false + while (temp != null && !found) { + found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" + temp = temp.getSuperclass + } + found + } + /** * Create a [[HiveClient]] used for execution. * @@ -313,7 +330,7 @@ private[spark] object HiveUtils extends Logging { hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, - isolationOn = true, + isolationOn = !isCliSessionState(), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 16a80f9fff..8cff0ca096 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -38,6 +38,12 @@ private[hive] trait HiveClient { /** Returns the configuration for the given key in the current session. */ def getConf(key: String, defaultValue: String): String + /** + * Return the associated Hive SessionState of this [[HiveClientImpl]] + * @return [[Any]] not SessionState to avoid linkage error + */ + def getState: Any + /** * Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will * result in one string. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bde9a81c65..5e5c0a2a50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -35,9 +35,9 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException @@ -105,100 +105,33 @@ private[hive] class HiveClientImpl( // Create an internal session state for this HiveClientImpl. val state: SessionState = { val original = Thread.currentThread().getContextClassLoader - // Switch to the initClassLoader. - Thread.currentThread().setContextClassLoader(initClassLoader) - - // Set up kerberos credentials for UserGroupInformation.loginUser within - // current class loader - if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { - val principalName = sparkConf.get("spark.yarn.principal") - val keytabFileName = sparkConf.get("spark.yarn.keytab") - if (!new File(keytabFileName).exists()) { - throw new SparkException(s"Keytab file: ${keytabFileName}" + - " specified in spark.yarn.keytab does not exist") - } else { - logInfo("Attempting to login to Kerberos" + - s" using principal: ${principalName} and keytab: ${keytabFileName}") - UserGroupInformation.loginUserFromKeytab(principalName, keytabFileName) - } - } - - def isCliSessionState(state: SessionState): Boolean = { - var temp: Class[_] = if (state != null) state.getClass else null - var found = false - while (temp != null && !found) { - found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState" - temp = temp.getSuperclass + if (clientLoader.isolationOn) { + // Switch to the initClassLoader. + Thread.currentThread().setContextClassLoader(initClassLoader) + // Set up kerberos credentials for UserGroupInformation.loginUser within current class loader + if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { + val principal = sparkConf.get("spark.yarn.principal") + val keytab = sparkConf.get("spark.yarn.keytab") + SparkHadoopUtil.get.loginUserFromKeytab(principal, keytab) } - found - } - - val ret = try { - // originState will be created if not exists, will never be null - val originalState = SessionState.get() - if (isCliSessionState(originalState)) { - // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`, - // which contains information like configurations from command line. Later - // we call `SparkSQLEnv.init()` there, which would run into this part again. - // so we should keep `conf` and reuse the existing instance of `CliSessionState`. - originalState - } else { - val hiveConf = new HiveConf(classOf[SessionState]) - // 1: we set all confs in the hadoopConf to this hiveConf. - // This hadoopConf contains user settings in Hadoop's core-site.xml file - // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in - // SharedState and put settings in this hadoopConf instead of relying on HiveConf - // to load user settings. Otherwise, HiveConf's initialize method will override - // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars - // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath - // has hive-site.xml. So, HiveConf will use that to override its default values. - hadoopConf.iterator().asScala.foreach { entry => - val key = entry.getKey - val value = entry.getValue - if (key.toLowerCase(Locale.ROOT).contains("password")) { - logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=xxx") - } else { - logDebug(s"Applying Hadoop and Hive config to Hive Conf: $key=$value") - } - hiveConf.set(key, value) - } - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - hiveConf.setClassLoader(initClassLoader) - // 2: we set all spark confs to this hiveConf. - sparkConf.getAll.foreach { case (k, v) => - if (k.toLowerCase(Locale.ROOT).contains("password")) { - logDebug(s"Applying Spark config to Hive Conf: $k=xxx") - } else { - logDebug(s"Applying Spark config to Hive Conf: $k=$v") - } - hiveConf.set(k, v) - } - // 3: we set all entries in config to this hiveConf. - extraConfig.foreach { case (k, v) => - if (k.toLowerCase(Locale.ROOT).contains("password")) { - logDebug(s"Applying extra config to HiveConf: $k=xxx") - } else { - logDebug(s"Applying extra config to HiveConf: $k=$v") - } - hiveConf.set(k, v) - } - val state = new SessionState(hiveConf) - if (clientLoader.cachedHive != null) { - Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) - } - SessionState.start(state) - state.out = new PrintStream(outputBuffer, true, "UTF-8") - state.err = new PrintStream(outputBuffer, true, "UTF-8") - state + try { + newState() + } finally { + Thread.currentThread().setContextClassLoader(original) } - } finally { - Thread.currentThread().setContextClassLoader(original) + } else { + // Isolation off means we detect a CliSessionState instance in current thread. + // 1: Inside the spark project, we have already started a CliSessionState in + // `SparkSQLCLIDriver`, which contains configurations from command lines. Later, we call + // `SparkSQLEnv.init()` there, which would new a hive client again. so we should keep those + // configurations and reuse the existing instance of `CliSessionState`. In this case, + // SessionState.get will always return a CliSessionState. + // 2: In another case, a user app may start a CliSessionState outside spark project with built + // in hive jars, which will turn off isolation, if SessionSate.detachSession is + // called to remove the current state after that, hive client created later will initialize + // its own state by newState() + Option(SessionState.get).getOrElse(newState()) } - ret } // Log the default warehouse location. @@ -206,6 +139,44 @@ private[hive] class HiveClientImpl( s"Warehouse location for Hive client " + s"(version ${version.fullVersion}) is ${conf.get("hive.metastore.warehouse.dir")}") + private def newState(): SessionState = { + val hiveConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + hiveConf.setClassLoader(initClassLoader) + + // 1: Take all from the hadoopConf to this hiveConf. + // This hadoopConf contains user settings in Hadoop's core-site.xml file + // and Hive's hive-site.xml file. Note, we load hive-site.xml file manually in + // SharedState and put settings in this hadoopConf instead of relying on HiveConf + // to load user settings. Otherwise, HiveConf's initialize method will override + // settings in the hadoopConf. This issue only shows up when spark.sql.hive.metastore.jars + // is not set to builtin. When spark.sql.hive.metastore.jars is builtin, the classpath + // has hive-site.xml. So, HiveConf will use that to override its default values. + // 2: we set all spark confs to this hiveConf. + // 3: we set all entries in config to this hiveConf. + (hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfig).foreach { case (k, v) => + logDebug( + s""" + |Applying Hadoop/Hive/Spark and extra properties to Hive Conf: + |$k=${if (k.toLowerCase(Locale.ROOT).contains("password")) "xxx" else v} + """.stripMargin) + hiveConf.set(k, v) + } + val state = new SessionState(hiveConf) + if (clientLoader.cachedHive != null) { + Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) + } + SessionState.start(state) + state.out = new PrintStream(outputBuffer, true, "UTF-8") + state.err = new PrintStream(outputBuffer, true, "UTF-8") + state + } + /** Returns the configuration for the current session. */ def conf: HiveConf = state.getConf @@ -269,6 +240,9 @@ private[hive] class HiveClientImpl( } } + /** Return the associated Hive [[SessionState]] of this [[HiveClientImpl]] */ + override def getState: SessionState = withHiveState(state) + /** * Runs `f` with ThreadLocal session state and classloaders configured for this version of hive. */ -- GitLab