From 054f991c4350af1350af7a4109ee77f4a34822f0 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Fri, 29 Apr 2016 01:14:02 -0700 Subject: [PATCH] [SPARK-14994][SQL] Remove execution hive from HiveSessionState ## What changes were proposed in this pull request? This patch removes executionHive from HiveSessionState and HiveSharedState. ## How was this patch tested? Updated test cases. Author: Reynold Xin <rxin@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12770 from rxin/SPARK-14994. --- .../spark/sql/catalyst/CatalystConf.scala | 5 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 9 ++ .../spark/sql/internal/SessionState.scala | 22 ++- .../spark/sql/internal/SharedState.scala | 18 +++ .../sql/execution/command/DDLSuite.scala | 152 ++++++++++-------- .../HiveServerServerOptionsProcessor.java | 85 ++++++++++ .../HiveServerServerOptionsProcessor.scala | 37 ----- .../hive/thriftserver/HiveThriftServer2.scala | 21 ++- .../SparkExecuteStatementOperation.scala | 5 +- .../sql/hive/thriftserver/SparkSQLEnv.scala | 6 - .../spark/sql/hive/HiveSessionCatalog.scala | 10 +- .../spark/sql/hive/HiveSessionState.scala | 34 ++-- .../spark/sql/hive/HiveSharedState.scala | 7 - .../hive/client/IsolatedClientLoader.scala | 12 +- .../hive/execution/InsertIntoHiveTable.scala | 67 +++++++- .../apache/spark/sql/hive/test/TestHive.scala | 27 ++-- .../spark/sql/hive/HiveContextSuite.scala | 40 ----- .../spark/sql/hive/SerializationSuite.scala | 32 ---- .../sql/hive/execution/HiveQuerySuite.scala | 45 ------ 20 files changed, 327 insertions(+), 309 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 179dab11a2..4df100c2a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -34,6 +34,8 @@ trait CatalystConf { def runSQLonFile: Boolean + def warehousePath: String + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. @@ -52,5 +54,6 @@ case class SimpleCatalystConf( optimizerMaxIterations: Int = 100, optimizerInSetConversionThreshold: Int = 10, maxCaseBranchesForCodegen: Int = 20, - runSQLonFile: Boolean = true) + runSQLonFile: Boolean = true, + warehousePath: String = "/user/hive/warehouse") extends CatalystConf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d7fd54308a..b06f24bc48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -125,7 +125,7 @@ class SessionCatalog( } def getDefaultDBPath(db: String): String = { - System.getProperty("java.io.tmpdir") + File.separator + db + ".db" + new Path(new Path(conf.warehousePath), db + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6fbf32676f..2bfc895678 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -52,6 +52,11 @@ object SQLConf { } + val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir") + .doc("The default location for managed databases and tables.") + .stringConf + .createWithDefault("${system:user.dir}/spark-warehouse") + val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations") .internal() .doc("The max number of iterations the optimizer and analyzer runs.") @@ -645,6 +650,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) + def warehousePath: String = { + getConf(WAREHOUSE_PATH).replace("${system:user.dir}", System.getProperty("user.dir")) + } + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index c05fe37886..cacf50ec7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.internal +import java.io.File import java.util.Properties import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql._ @@ -65,9 +67,6 @@ private[sql] class SessionState(sparkSession: SparkSession) { hadoopConf } - // Automatically extract `spark.sql.*` entries and put it in our SQLConf - setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf)) - lazy val experimentalMethods = new ExperimentalMethods /** @@ -150,6 +149,12 @@ private[sql] class SessionState(sparkSession: SparkSession) { new ContinuousQueryManager(sparkSession) } + private val jarClassLoader: NonClosableMutableURLClassLoader = + sparkSession.sharedState.jarClassLoader + + // Automatically extract `spark.sql.*` entries and put it in our SQLConf + // We need to call it after all of vals have been initialized. + setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf)) // ------------------------------------------------------ // Helper methods, partially leftover from pre-2.0 days @@ -180,6 +185,17 @@ private[sql] class SessionState(sparkSession: SparkSession) { def addJar(path: String): Unit = { sparkSession.sparkContext.addJar(path) + + val uri = new Path(path).toUri + val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL + } else { + // `path` is a URL with a scheme + uri.toURL + } + jarClassLoader.addURL(jarURL) + Thread.currentThread().setContextClassLoader(jarClassLoader) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 9a30c7de1f..ab4af8d142 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.SQLListener +import org.apache.spark.util.MutableURLClassLoader /** @@ -44,4 +45,21 @@ private[sql] class SharedState(val sparkContext: SparkContext) { */ lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog + /** + * A classloader used to load all user-added jar. + */ + val jarClassLoader = new NonClosableMutableURLClassLoader( + org.apache.spark.util.Utils.getContextOrSparkClassLoader) + +} + + +/** + * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader. + * This class loader cannot be closed (its `close` method is a no-op). + */ +private[sql] class NonClosableMutableURLClassLoader(parent: ClassLoader) + extends MutableURLClassLoader(Array.empty, parent) { + + override def close(): Unit = {} } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a9a9bf76be..4162329d76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { @@ -83,91 +84,100 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("Create/Drop Database") { - val catalog = sqlContext.sessionState.catalog + withSQLConf( + SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { + val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") + val databaseNames = Seq("db1", "`database`") - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, - "", - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", - Map.empty)) - sql(s"DROP DATABASE $dbName CASCADE") - assert(!catalog.databaseExists(dbNameWithoutBackTicks)) - } finally { - catalog.reset() + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + Map.empty)) + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbNameWithoutBackTicks)) + } finally { + catalog.reset() + } } } } test("Create Database - database already exists") { - val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") - - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, - "", - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", - Map.empty)) - - val message = intercept[AnalysisException] { + withSQLConf( + SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) sql(s"CREATE DATABASE $dbName") - }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) - } finally { - catalog.reset() + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + Map.empty)) + + val message = intercept[AnalysisException] { + sql(s"CREATE DATABASE $dbName") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) + } finally { + catalog.reset() + } } } } test("Alter/Describe Database") { - val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") - - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - val location = - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db" - sql(s"CREATE DATABASE $dbName") - - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "") :: Nil) - - sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") - - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) - - sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") + withSQLConf( + SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + val location = + System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db" + sql(s"CREATE DATABASE $dbName") - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) - } finally { - catalog.reset() + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "") :: Nil) + + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) + + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) + } finally { + catalog.reset() + } } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java new file mode 100644 index 0000000000..0f2683db07 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.server; + +import java.util.Properties; + +import org.apache.commons.cli.*; + +public class HiveServerServerOptionsProcessor { + private final Options options = new Options(); + private org.apache.commons.cli.CommandLine commandLine; + private final String serverName; + private final StringBuilder debugMessage = new StringBuilder(); + + @SuppressWarnings("static-access") + public HiveServerServerOptionsProcessor(String serverName) { + this.serverName = serverName; + // -hiveconf x=y + options.addOption(OptionBuilder + .withValueSeparator() + .hasArgs(2) + .withArgName("property=value") + .withLongOpt("hiveconf") + .withDescription("Use value for given property") + .create()); + // -deregister <versionNumber> + options.addOption(OptionBuilder + .hasArgs(1) + .withArgName("versionNumber") + .withLongOpt("deregister") + .withDescription("Deregister all instances of given version from dynamic service discovery") + .create()); + options.addOption(new Option("H", "help", false, "Print help information")); + } + + public HiveServer2.ServerOptionsProcessorResponse parse(String[] argv) { + try { + commandLine = new GnuParser().parse(options, argv); + // Process --hiveconf + // Get hiveconf param values and set the System property values + Properties confProps = commandLine.getOptionProperties("hiveconf"); + for (String propKey : confProps.stringPropertyNames()) { + // save logging message for log4j output latter after log4j initialize properly + debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n"); + // System.setProperty("hivecli." + propKey, confProps.getProperty(propKey)); + System.setProperty(propKey, confProps.getProperty(propKey)); + } + + // Process --help + if (commandLine.hasOption('H')) { + return new HiveServer2.ServerOptionsProcessorResponse( + new HiveServer2.HelpOptionExecutor(serverName, options)); + } + + // Process --deregister + if (commandLine.hasOption("deregister")) { + return new HiveServer2.ServerOptionsProcessorResponse( + new HiveServer2.DeregisterOptionExecutor( + commandLine.getOptionValue("deregister"))); + } + } catch (ParseException e) { + // Error out & exit - we were not able to parse the args successfully + System.err.println("Error starting HiveServer2 with given arguments: "); + System.err.println(e.getMessage()); + System.exit(-1); + } + // Default executor, when no option is specified + return new HiveServer2.ServerOptionsProcessorResponse(new HiveServer2.StartOptionExecutor()); + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala deleted file mode 100644 index 60bb4dc5e7..0000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hive.service.server - -import org.apache.hive.service.server.HiveServer2.{ServerOptionsProcessor, StartOptionExecutor} - -/** - * Class to upgrade a package-private class to public, and - * implement a `process()` operation consistent with - * the behavior of older Hive versions - * @param serverName name of the hive server - */ -private[apache] class HiveServerServerOptionsProcessor(serverName: String) - extends ServerOptionsProcessor(serverName) { - - def process(args: Array[String]): Boolean = { - // A parse failure automatically triggers a system exit - val response = super.parse(args) - val executor = response.getServerOptionsExecutor() - // return true if the parsed option was to start the service - executor.isInstanceOf[StartOptionExecutor] - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 24a25023a6..03727b8ab2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -22,8 +22,10 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import org.apache.commons.logging.LogFactory +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} @@ -34,7 +36,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveSessionState +import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -56,7 +58,12 @@ object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + + val executionHive = HiveUtils.newClientForExecution( + sqlContext.sparkContext.conf, + sqlContext.sessionState.newHadoopConf()) + + server.init(executionHive.conf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) sqlContext.sparkContext.addSparkListener(listener) @@ -70,9 +77,7 @@ object HiveThriftServer2 extends Logging { def main(args: Array[String]) { Utils.initDaemon(log) val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") - if (!optionsProcessor.process(args)) { - System.exit(-1) - } + optionsProcessor.parse(args) logInfo("Starting SparkContext") SparkSQLEnv.init() @@ -82,9 +87,13 @@ object HiveThriftServer2 extends Logging { uiTab.foreach(_.detach()) } + val executionHive = HiveUtils.newClientForExecution( + SparkSQLEnv.sqlContext.sparkContext.conf, + SparkSQLEnv.sqlContext.sessionState.newHadoopConf()) + try { val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) - server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + server.init(executionHive.conf) server.start() logInfo("HiveThriftServer2 started") listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 40dc81e02d..e8bcdd76ef 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} import org.apache.spark.sql.execution.command.SetCommand -import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils => SparkUtils} @@ -195,9 +195,8 @@ private[hive] class SparkExecuteStatementOperation( statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] // Always use the latest class loader provided by executionHive's state. - val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader + val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) HiveThriftServer2.listener.onStatementStart( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 268ba2f0bc..665a44e51a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -60,13 +60,7 @@ private[hive] object SparkSQLEnv extends Logging { sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) - - if (log.isDebugEnabled) { - sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted - .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } - } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index f70131ec86..456587e0e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} @@ -46,8 +44,7 @@ private[sql] class HiveSessionCatalog( sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: SQLConf, - hiveconf: HiveConf) + conf: SQLConf) extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) { override def setCurrentDatabase(db: String): Unit = { @@ -73,11 +70,6 @@ private[sql] class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- - override def getDefaultDBPath(db: String): String = { - val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) - new Path(new Path(defaultPath), db + ".db").toString - } - // Catalog for handling data source tables. TODO: This really doesn't belong here since it is // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index e085094383..9608f0b4ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive -import java.util.regex.Pattern - import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -26,7 +24,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.SessionState @@ -42,11 +40,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) sparkSession.sharedState.asInstanceOf[HiveSharedState] } - /** - * A Hive client used for execution. - */ - lazy val executionHive: HiveClientImpl = sharedState.executionHive.newSession() - /** * A Hive client used for interacting with the metastore. */ @@ -61,9 +54,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) * set in the SQLConf *as well as* in the HiveConf. */ lazy val hiveconf: HiveConf = { - val c = executionHive.conf - conf.setConf(c.getAllProperties) - c + val initialConf = new HiveConf( + sparkSession.sparkContext.hadoopConfiguration, + classOf[org.apache.hadoop.hive.ql.session.SessionState]) + + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + initialConf.setClassLoader(sparkSession.sharedState.jarClassLoader) + sparkSession.sparkContext.conf.getAll.foreach { case (k, v) => + initialConf.set(k, v) + } + initialConf } setDefaultOverrideConfs() @@ -78,8 +82,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) sparkSession, functionResourceLoader, functionRegistry, - conf, - hiveconf) + conf) } /** @@ -141,16 +144,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override def setConf(key: String, value: String): Unit = { super.setConf(key, value) - executionHive.runSqlHive(s"SET $key=$value") metadataHive.runSqlHive(s"SET $key=$value") hiveconf.set(key, value) } override def addJar(path: String): Unit = { - super.addJar(path) - executionHive.addJar(path) metadataHive.addJar(path) - Thread.currentThread().setContextClassLoader(executionHive.clientLoader.classLoader) + super.addJar(path) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index fb1f59eed3..0ea5ce9196 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -31,13 +31,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext) // TODO: just share the IsolatedClientLoader instead of the client instances themselves - /** - * A Hive client used for execution. - */ - val executionHive: HiveClientImpl = { - HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration) - } - /** * A Hive client used to interact with the metastore. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 0380d2342b..e1950d181d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -33,6 +33,7 @@ import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader import org.apache.spark.util.{MutableURLClassLoader, Utils} /** Factory for `IsolatedClientLoader` with specific versions of hive. */ @@ -278,14 +279,3 @@ private[hive] class IsolatedClientLoader( */ private[hive] var cachedHive: Any = null } - -/** - * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader. - * This class loader cannot be closed (its `close` method is a no-op). - */ -private[sql] class NonClosableMutableURLClassLoader( - parent: ClassLoader) - extends MutableURLClassLoader(Array.empty, parent) { - - override def close(): Unit = {} -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index cba10caf98..73ccec2ee0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,11 +17,19 @@ package org.apache.spark.sql.hive.execution +import java.io.IOException +import java.net.URI +import java.text.SimpleDateFormat import java.util +import java.util.{Date, Random} import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.ql.{Context, ErrorMsg} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner +import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD @@ -46,6 +54,61 @@ case class InsertIntoHiveTable( def output: Seq[Attribute] = Seq.empty + val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + + private def executionId: String = { + val rand: Random = new Random + val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") + val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) + return executionId + } + + private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { + val inputPathUri: URI = inputPath.toUri + val inputPathName: String = inputPathUri.getPath + val fs: FileSystem = inputPath.getFileSystem(hadoopConf) + val stagingPathName: String = + if (inputPathName.indexOf(stagingDir) == -1) { + new Path(inputPathName, stagingDir).toString + } else { + inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) + } + val dir: Path = + fs.makeQualified( + new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) + logDebug("Created staging dir = " + dir + " for path = " + inputPath) + try { + if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { + throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") + } + fs.deleteOnExit(dir) + } + catch { + case e: IOException => + throw new RuntimeException( + "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) + + } + return dir + } + + private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { + getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) + } + + def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { + val extURI: URI = path.toUri + if (extURI.getScheme == "viewfs") { + getExtTmpPathRelTo(path.getParent, hadoopConf) + } else { + new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") + } + } + + def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { + new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 + } + private def saveAsHiveFile( rdd: RDD[InternalRow], valueClass: Class[_], @@ -81,7 +144,7 @@ case class InsertIntoHiveTable( val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation val hadoopConf = sessionState.newHadoopConf() - val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation) + val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val isCompressed = sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index ddb72fb1e1..c4a3a74b9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -136,7 +136,8 @@ private[hive] class TestHiveSparkSession( } @transient - override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self) + override lazy val sessionState: TestHiveSessionState = + new TestHiveSessionState(self, warehousePath) override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession( @@ -156,19 +157,8 @@ private[hive] class TestHiveSparkSession( sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML") - // A snapshot of the entries in the starting SQLConf - // We save this because tests can mutate this singleton object if they want - // This snapshot is saved when we create this TestHiveSparkSession. - val initialSQLConf: SQLConf = { - val snapshot = new SQLConf - sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) } - snapshot - } - - val testTempDir = Utils.createTempDir() - // For some hive test case which contain ${system:test.tmp.dir} - System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath) + System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath) /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") @@ -521,8 +511,10 @@ private[hive] class TestHiveSharedState( } -private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) - extends HiveSessionState(sparkSession) { +private[hive] class TestHiveSessionState( + sparkSession: TestHiveSparkSession, + warehousePath: File) + extends HiveSessionState(sparkSession) { self => override lazy val conf: SQLConf = { new SQLConf { @@ -530,9 +522,8 @@ private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) override def clear(): Unit = { super.clear() - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) - } + TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } + setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala deleted file mode 100644 index b2c0f7e0e5..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.hive.test.TestHive - - -class HiveContextSuite extends SparkFunSuite { - - test("HiveContext can access `spark.sql.*` configs") { - // Avoid creating another SparkContext in the same JVM - val sc = TestHive.sparkContext - require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.sparkSession.initialSQLConf.getConfString( - "spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - // This setting should be also set in the hiveconf of the current session. - assert(TestHive.sessionState.hiveconf.get( - "spark.sql.hive.metastore.barrierPrefixes", "") == - "org.apache.spark.sql.hive.execution.PairSerDe") - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala deleted file mode 100644 index ac3a65032f..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.JavaSerializer - -class SerializationSuite extends SparkFunSuite { - - test("[SPARK-5840] HiveContext should be serializable") { - val hiveContext = org.apache.spark.sql.hive.test.TestHive - hiveContext.sessionState.hiveconf - val serializer = new JavaSerializer(new SparkConf()).newInstance() - val bytes = serializer.serialize(hiveContext) - val deSer = serializer.deserialize[AnyRef](bytes) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e5a7706cc5..3bf0e84267 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1070,51 +1070,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(getConf(testKey, "0") == "") } - test("SET commands semantics for a HiveContext") { - // Adapted from its SQL counterpart. - val testKey = "spark.sql.key.usedfortestonly" - val testVal = "test.val.0" - val nonexistentKey = "nonexistent" - def collectResults(df: DataFrame): Set[Any] = - df.collect().map { - case Row(key: String, value: String) => key -> value - case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc) - }.toSet - conf.clear() - - val expectedConfs = conf.getAllDefinedConfs.toSet - assertResult(expectedConfs)(collectResults(sql("SET -v"))) - - // "SET" itself returns all config variables currently specified in SQLConf. - // TODO: Should we be listing the default here always? probably... - assert(sql("SET").collect().size === TestHiveContext.overrideConfs.size) - - val defaults = collectResults(sql("SET")) - assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey=$testVal")) - } - - assert(sessionState.hiveconf.get(testKey, "") === testVal) - assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET"))) - - sql(s"SET ${testKey + testKey}=${testVal + testVal}") - assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { - collectResults(sql("SET")) - } - - // "SET key" - assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey")) - } - - assertResult(Set(nonexistentKey -> "<undefined>")) { - collectResults(sql(s"SET $nonexistentKey")) - } - - conf.clear() - } - test("current_database with multiple sessions") { sql("create database a") sql("use a") -- GitLab