Skip to content
Snippets Groups Projects
Commit 4782def0 authored by carlmartin's avatar carlmartin Committed by Michael Armbrust
Browse files

[SPARK-4694]Fix HiveThriftServer2 cann't stop In Yarn HA mode.

HiveThriftServer2 can not exit automactic when changing the standy resource manager in Yarn HA mode.
The scheduler backend was aware of the AM had been exited so it call sc.stop to exit the driver process but there was a user thread(HiveThriftServer2 ) which was still alive and cause this problem.
To fix it, make a demo thread to detect the sparkContext is null or not.If the sc is stopped, call the ThriftServer.stop to stop the user thread.

Author: carlmartin <carlmartinmax@gmail.com>

Closes #3576 from SaintBacchus/ThriftServer2ExitBug and squashes the following commits:

2890b4a [carlmartin] Use SparkListener instead of the demo thread to stop the hive server.
c15da0e [carlmartin] HiveThriftServer2 can not exit automactic when changing the standy resource manager in Yarn HA mode
parent 5fdcbdc0
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
......@@ -44,9 +45,9 @@ object HiveThriftServer2 extends Logging {
val server = new HiveThriftServer2(sqlContext)
server.init(sqlContext.hiveconf)
server.start()
sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
}
def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
......@@ -69,12 +70,23 @@ object HiveThriftServer2 extends Logging {
server.init(SparkSQLEnv.hiveContext.hiveconf)
server.start()
logInfo("HiveThriftServer2 started")
SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server))
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}
/**
* A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
*/
class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
}
}
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment