Skip to content
Snippets Groups Projects
Commit 7ff9311a authored by Denny's avatar Denny
Browse files

Add shutdown hook to Executor Runner and execute code to shutdown local...

Add shutdown hook to Executor Runner and execute code to shutdown local cluster in Scheduler Backend
parent 4e7b264c
No related branches found
No related tags found
No related merge requests found
...@@ -110,6 +110,9 @@ class SparkContext( ...@@ -110,6 +110,9 @@ class SparkContext(
val sparkUrl = localCluster.start() val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend) scheduler.initialize(backend)
backend.shutdownHook = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
}
scheduler scheduler
case _ => case _ =>
......
...@@ -16,9 +16,11 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, ...@@ -16,9 +16,11 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
val localIpAddress = Utils.localIpAddress val localIpAddress = Utils.localIpAddress
var masterActor : ActorRef = _ var masterActor : ActorRef = _
var masterActorSystem : ActorSystem = _
var masterPort : Int = _ var masterPort : Int = _
var masterUrl : String = _ var masterUrl : String = _
val slaveActorSystems = ArrayBuffer[ActorSystem]()
val slaveActors = ArrayBuffer[ActorRef]() val slaveActors = ArrayBuffer[ActorRef]()
def start() : String = { def start() : String = {
...@@ -41,6 +43,7 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, ...@@ -41,6 +43,7 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
(1 to numSlaves).foreach { slaveNum => (1 to numSlaves).foreach { slaveNum =>
val (actorSystem, boundPort) = val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
slaveActorSystems += actorSystem
threadPool.execute(new Runnable { threadPool.execute(new Runnable {
def run() { def run() {
val actor = actorSystem.actorOf( val actor = actorSystem.actorOf(
...@@ -52,22 +55,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, ...@@ -52,22 +55,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
}) })
} }
// Shutdown hook that kills actors on shutdown.
Runtime.getRuntime.addShutdownHook(
new Thread() {
override def run() {
masterActorSystem.stop(masterActor)
masterActorSystem.shutdown()
// Since above is asynchronous wait for the master actor to shut down
while(!masterActor.isTerminated) {
Thread.sleep(10)
}
}
})
return masterUrl return masterUrl
} }
def stop() {
logInfo("Shutting down local Spark cluster.")
masterActorSystem.shutdown()
slaveActorSystems.foreach(_.shutdown())
}
} }
\ No newline at end of file
...@@ -35,6 +35,19 @@ class ExecutorRunner( ...@@ -35,6 +35,19 @@ class ExecutorRunner(
override def run() { fetchAndRunExecutor() } override def run() { fetchAndRunExecutor() }
} }
workerThread.start() workerThread.start()
// Shutdown hook that kills actors on shutdown.
Runtime.getRuntime.addShutdownHook(
new Thread() {
override def run() {
if(process != null) {
logInfo("Shutdown Hook killing process.")
process.destroy()
process.waitFor()
}
}
})
} }
/** Stop this executor runner, including killing the process it launched */ /** Stop this executor runner, including killing the process it launched */
......
...@@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend( ...@@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend(
var client: Client = null var client: Client = null
var stopping = false var stopping = false
var shutdownHook : (SparkDeploySchedulerBackend) => Unit = _
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
...@@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend( ...@@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend(
stopping = true; stopping = true;
super.stop() super.stop()
client.stop() client.stop()
if (shutdownHook != null) {
shutdownHook(this)
}
} }
def connected(jobId: String) { def connected(jobId: String) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment