From 5d80fac58f837933b5359a8057676f45539e53af Mon Sep 17 00:00:00 2001 From: Daoyuan Wang <daoyuan.wang@intel.com> Date: Mon, 22 Feb 2016 18:13:32 -0800 Subject: [PATCH] [SPARK-11624][SPARK-11972][SQL] fix commands that need hive to exec In SparkSQLCLI, we have created a `CliSessionState`, but then we call `SparkSQLEnv.init()`, which will start another `SessionState`. This would lead to exception because `processCmd` need to get the `CliSessionState` instance by calling `SessionState.get()`, but the return value would be a instance of `SessionState`. See the exception below. spark-sql> !echo "test"; Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.hive.ql.session.SessionState cannot be cast to org.apache.hadoop.hive.cli.CliSessionState at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:112) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:301) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:242) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:691) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #9589 from adrian-wang/clicommand. --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 7 ++- .../sql/hive/thriftserver/CliSuite.scala | 5 ++ sql/hive/pom.xml | 6 +++ .../sql/hive/client/HiveClientImpl.scala | 54 +++++++++++-------- 4 files changed, 48 insertions(+), 24 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index f279b78f47..fb31119a9e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -288,8 +288,11 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { val tokens: Array[String] = cmd_trimmed.split("\\s+") val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() if (cmd_lower.equals("quit") || - cmd_lower.equals("exit") || - tokens(0).toLowerCase(Locale.ENGLISH).equals("source") || + cmd_lower.equals("exit")) { + sessionState.close() + System.exit(0) + } + if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") || cmd_trimmed.startsWith("!") || tokens(0).toLowerCase.equals("list") || isRemoteMode) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 72da266da4..81508e1346 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -234,4 +234,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { -> "Error in query: Table not found: nonexistent_table;" ) } + + test("SPARK-11624 Spark SQL CLI should set sessionState only once") { + runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))( + "" -> "This is a test for Spark-11624") + } } diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 14cf9acf09..22bad93e6d 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -72,6 +72,12 @@ <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> +--> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-cli</artifactId> + </dependency> +<!-- <dependency> <groupId>${hive.group}</groupId> <artifactId>hive-common</artifactId> diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7a007d2acc..5d62854c40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.cli.CliSessionState import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri} @@ -31,7 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab import org.apache.hadoop.hive.ql.plan.AddPartitionDesc import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader} import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{Logging, SparkConf, SparkException} @@ -105,29 +105,39 @@ private[hive] class HiveClientImpl( } val ret = try { - val initialConf = new HiveConf(classOf[SessionState]) - // HiveConf is a Hadoop Configuration, which has a field of classLoader and - // the initial value will be the current thread's context class loader - // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make - // this action explicit. - initialConf.setClassLoader(initClassLoader) - config.foreach { case (k, v) => - if (k.toLowerCase.contains("password")) { - logDebug(s"Hive Config: $k=xxx") - } else { - logDebug(s"Hive Config: $k=$v") + // originState will be created if not exists, will never be null + val originalState = SessionState.get() + if (originalState.isInstanceOf[CliSessionState]) { + // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`, + // which contains information like configurations from command line. Later + // we call `SparkSQLEnv.init()` there, which would run into this part again. + // so we should keep `conf` and reuse the existing instance of `CliSessionState`. + originalState + } else { + val initialConf = new HiveConf(classOf[SessionState]) + // HiveConf is a Hadoop Configuration, which has a field of classLoader and + // the initial value will be the current thread's context class loader + // (i.e. initClassLoader at here). + // We call initialConf.setClassLoader(initClassLoader) at here to make + // this action explicit. + initialConf.setClassLoader(initClassLoader) + config.foreach { case (k, v) => + if (k.toLowerCase.contains("password")) { + logDebug(s"Hive Config: $k=xxx") + } else { + logDebug(s"Hive Config: $k=$v") + } + initialConf.set(k, v) } - initialConf.set(k, v) - } - val state = new SessionState(initialConf) - if (clientLoader.cachedHive != null) { - Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) + val state = new SessionState(initialConf) + if (clientLoader.cachedHive != null) { + Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) + } + SessionState.start(state) + state.out = new PrintStream(outputBuffer, true, "UTF-8") + state.err = new PrintStream(outputBuffer, true, "UTF-8") + state } - SessionState.start(state) - state.out = new PrintStream(outputBuffer, true, "UTF-8") - state.err = new PrintStream(outputBuffer, true, "UTF-8") - state } finally { Thread.currentThread().setContextClassLoader(original) } -- GitLab