diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index c5b73234fa1de762df8ac1fff4fa68f24979f2a7..6e07df18b0e153e92a439bec5d767fd307a2a59e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -27,6 +27,7 @@ import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} /** * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a @@ -44,9 +45,9 @@ object HiveThriftServer2 extends Logging { val server = new HiveThriftServer2(sqlContext) server.init(sqlContext.hiveconf) server.start() + sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server)) } - def main(args: Array[String]) { val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { @@ -69,12 +70,23 @@ object HiveThriftServer2 extends Logging { server.init(SparkSQLEnv.hiveContext.hiveconf) server.start() logInfo("HiveThriftServer2 started") + SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server)) } catch { case e: Exception => logError("Error starting HiveThriftServer2", e) System.exit(-1) } } + + /** + * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2 + */ + class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener { + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + server.stop() + } + } + } private[hive] class HiveThriftServer2(hiveContext: HiveContext)