Skip to content
Snippets Groups Projects
Commit 6415c2bb authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Don't create the Executor until we have everything it needs.

parent 80eecd2c
No related branches found
No related tags found
No related merge requests found
...@@ -16,11 +16,8 @@ import java.nio.ByteBuffer ...@@ -16,11 +16,8 @@ import java.nio.ByteBuffer
/** /**
* The Mesos executor for Spark. * The Mesos executor for Spark.
*/ */
private[spark] class Executor extends Logging { private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
@volatile private var urlClassLoader : ExecutorURLClassLoader = null
@volatile private var threadPool: ExecutorService = null
@volatile private var env: SparkEnv = null
// Application dependencies (added through SparkContext) that we've fetched so far on this node. // Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got. // Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
...@@ -30,52 +27,50 @@ private[spark] class Executor extends Logging { ...@@ -30,52 +27,50 @@ private[spark] class Executor extends Logging {
initLogging() initLogging()
def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) { // Make sure the local hostname we report matches the cluster scheduler's name for this host
// Make sure the local hostname we report matches the cluster scheduler's name for this host Utils.setCustomHostname(slaveHostname)
Utils.setCustomHostname(slaveHostname)
// Set spark.* system properties from executor arg // Set spark.* system properties from executor arg
for ((key, value) <- properties) { for ((key, value) <- properties) {
System.setProperty(key, value) System.setProperty(key, value)
} }
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
// Create our ClassLoader and set it on this thread // Make any thread terminations due to uncaught exceptions kill the entire
urlClassLoader = createClassLoader() // executor process to avoid surprising stalls.
Thread.currentThread.setContextClassLoader(urlClassLoader) Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
// Make any thread terminations due to uncaught exceptions kill the entire override def uncaughtException(thread: Thread, exception: Throwable) {
// executor process to avoid surprising stalls. try {
Thread.setDefaultUncaughtExceptionHandler( logError("Uncaught exception in thread " + thread, exception)
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) { // We may have been called from a shutdown hook. If so, we must not call System.exit().
try { // (If we do, we will deadlock.)
logError("Uncaught exception in thread " + thread, exception) if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
// We may have been called from a shutdown hook. If so, we must not call System.exit(). System.exit(ExecutorExitCode.OOM)
// (If we do, we will deadlock.) } else {
if (!Utils.inShutdown()) { System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
} }
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
} }
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
} }
} }
) }
)
// Initialize Spark environment (using system properties read above) // Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env) SparkEnv.set(env)
// Start worker thread pool // Start worker thread pool
threadPool = new ThreadPoolExecutor( val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask)) threadPool.execute(new TaskRunner(context, taskId, serializedTask))
......
...@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString ...@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging} import spark.{Utils, Logging}
import spark.TaskState import spark.TaskState
private[spark] class MesosExecutorBackend(executor: Executor) private[spark] class MesosExecutorBackend
extends MesosExecutor extends MesosExecutor
with ExecutorBackend with ExecutorBackend
with Logging { with Logging {
var executor: Executor = null
var driver: ExecutorDriver = null var driver: ExecutorDriver = null
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
...@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor) ...@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
executor.initialize( executor = new Executor(
executorInfo.getExecutorId.getValue, executorInfo.getExecutorId.getValue,
slaveInfo.getHostname, slaveInfo.getHostname,
properties properties)
)
} }
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
val taskId = taskInfo.getTaskId.getValue.toLong val taskId = taskInfo.getTaskId.getValue.toLong
executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) if (executor == null) {
logError("Received launchTask but executor was null")
} else {
executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
}
} }
override def error(d: ExecutorDriver, message: String) { override def error(d: ExecutorDriver, message: String) {
...@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend { ...@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
def main(args: Array[String]) { def main(args: Array[String]) {
MesosNativeLibrary.load() MesosNativeLibrary.load()
// Create a new Executor and start it running // Create a new Executor and start it running
val runner = new MesosExecutorBackend(new Executor) val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run() new MesosExecutorDriver(runner).run()
} }
} }
...@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed ...@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend( private[spark] class StandaloneExecutorBackend(
executor: Executor,
driverUrl: String, driverUrl: String,
executorId: String, executorId: String,
hostname: String, hostname: String,
...@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend( ...@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend with ExecutorBackend
with Logging { with Logging {
var executor: Executor = null
var driver: ActorRef = null var driver: ActorRef = null
override def preStart() { override def preStart() {
...@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend( ...@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend(
override def receive = { override def receive = {
case RegisteredExecutor(sparkProperties) => case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver") logInfo("Successfully registered with driver")
executor.initialize(executorId, hostname, sparkProperties) executor = new Executor(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) => case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message) logError("Slave registration failed: " + message)
...@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend( ...@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) => case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId) logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) if (executor == null) {
logError("Received launchTask but executor was null")
System.exit(1)
} else {
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.") logError("Driver terminated or disconnected! Shutting down.")
...@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend { ...@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
// before getting started with all our system properties, etc // before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf( val actor = actorSystem.actorOf(
Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)), Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
name = "Executor") name = "Executor")
actorSystem.awaitTermination() actorSystem.awaitTermination()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment