Skip to content
Snippets Groups Projects
Commit 4e7b264c authored by Denny's avatar Denny
Browse files

Set SPARK_LAUNCH_WITH_SCALA=0 in Executor Runner

parent 886183e5
No related branches found
No related tags found
No related merge requests found
......@@ -26,14 +26,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterUrl = "spark://" + localIpAddress + ":" + masterPort
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
val actor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
masterActor = actor
actorSystem.awaitTermination()
masterActorSystem.awaitTermination()
}
})
......@@ -52,6 +52,20 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
})
}
// Shutdown hook that kills actors on shutdown.
Runtime.getRuntime.addShutdownHook(
new Thread() {
override def run() {
masterActorSystem.stop(masterActor)
masterActorSystem.shutdown()
// Since above is asynchronous wait for the master actor to shut down
while(!masterActor.isTerminated) {
Thread.sleep(10)
}
}
})
return masterUrl
}
......
......@@ -131,6 +131,9 @@ class ExecutorRunner(
}
env.put("SPARK_CORES", cores.toString)
env.put("SPARK_MEMORY", memory.toString)
// In case we are running this from within the Spark Shell
// so we are not creating a parent process.
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
// Redirect its stdout and stderr to files
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment