Skip to content
Snippets Groups Projects
Commit 5b99bf24 authored by Cheng Lian's avatar Cheng Lian Committed by Patrick Wendell
Browse files

[SPARK-4645][SQL] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2

This PR disables HiveThriftServer2 asynchronous execution by setting `runInBackground` argument in `ExecuteStatementOperation` to `false`, and reverting `SparkExecuteStatementOperation.run` in Hive 13 shim to Hive 12 version. This change makes Simba ODBC driver v1.0.0.1000 work.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3506)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #3506 from liancheng/disable-async-exec and squashes the following commits:

593804d [Cheng Lian] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2
parent ceb62819
No related branches found
No related tags found
No related merge requests found
...@@ -17,30 +17,25 @@ ...@@ -17,30 +17,25 @@
package org.apache.spark.sql.hive.thriftserver package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp} import java.sql.{Date, Timestamp}
import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._ import scala.math._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.{SchemaRDD, Row => SparkRow} import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
/** /**
* A compatibility layer for interacting with Hive version 0.13.1. * A compatibility layer for interacting with Hive version 0.13.1.
...@@ -48,7 +43,9 @@ import org.apache.spark.sql.{SchemaRDD, Row => SparkRow} ...@@ -48,7 +43,9 @@ import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
private[thriftserver] object HiveThriftServerShim { private[thriftserver] object HiveThriftServerShim {
val version = "0.13.1" val version = "0.13.1"
def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { def setServerUserName(
sparkServiceUGI: UserGroupInformation,
sparkCliService:SparkSQLCLIService) = {
setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI) setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
} }
} }
...@@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation(
confOverlay: JMap[String, String], confOverlay: JMap[String, String],
runInBackground: Boolean = true)( runInBackground: Boolean = true)(
hiveContext: HiveContext, hiveContext: HiveContext,
sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation( sessionToActivePool: SMap[HiveSession, String])
parentSession, statement, confOverlay, runInBackground) with Logging { // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
private var result: SchemaRDD = _ private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _ private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _ private var dataTypes: Array[DataType] = _
private def runInternal(cmd: String) = {
try {
result = hiveContext.sql(cmd)
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.toLocalIterator
} else {
result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}
def close(): Unit = { def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection. // RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING") logDebug("CLOSING")
...@@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation( ...@@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation(
} }
} }
private def getConfigForOperation: HiveConf = {
var sqlOperationConf: HiveConf = getParentSession.getHiveConf
if (!getConfOverlay.isEmpty || shouldRunAsync) {
sqlOperationConf = new HiveConf(sqlOperationConf)
import scala.collection.JavaConversions._
for (confEntry <- getConfOverlay.entrySet) {
try {
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
}
catch { case e: IllegalArgumentException =>
throw new HiveSQLException("Error applying statement specific settings", e)
}
}
}
sqlOperationConf
}
def run(): Unit = { def run(): Unit = {
logInfo(s"Running query '$statement'") logInfo(s"Running query '$statement'")
val opConfig: HiveConf = getConfigForOperation
setState(OperationState.RUNNING) setState(OperationState.RUNNING)
setHasResultSet(true) try {
result = hiveContext.sql(statement)
if (!shouldRunAsync) { logDebug(result.queryExecution.toString())
runInternal(statement) result.queryExecution.logical match {
setState(OperationState.FINISHED) case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
} else { sessionToActivePool(parentSession) = value
val parentSessionState = SessionState.get logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
val sessionHive: Hive = Hive.get case _ =>
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
val backgroundOperation: Runnable = new Runnable {
def run() {
val doAsAction: PrivilegedExceptionAction[AnyRef] =
new PrivilegedExceptionAction[AnyRef] {
def run: AnyRef = {
Hive.set(sessionHive)
SessionState.setCurrentSessionState(parentSessionState)
try {
runInternal(statement)
}
catch { case e: HiveSQLException =>
setOperationException(e)
logError("Error running hive query: ", e)
}
null
}
}
try {
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
}
catch { case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
}
setState(OperationState.FINISHED)
}
} }
try { val groupId = round(random * 1000000).toString
val backgroundHandle: Future[_] = getParentSession.getSessionManager. hiveContext.sparkContext.setJobGroup(groupId, statement)
submitBackgroundOperation(backgroundOperation) sessionToActivePool.get(parentSession).foreach { pool =>
setBackgroundHandle(backgroundHandle) hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
} catch { }
// Actually do need to catch Throwable as some failures don't inherit from Exception and iter = {
// HiveServer will silently swallow them. val useIncrementalCollect =
case e: Throwable => hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
logError("Error executing query:",e) if (useIncrementalCollect) {
throw new HiveSQLException(e.toString) result.toLocalIterator
} else {
result.collect().iterator
}
} }
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
logError("Error executing query:", e)
throw new HiveSQLException(e.toString)
} }
setState(OperationState.FINISHED)
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment