Skip to content
Snippets Groups Projects
Commit b8949cab authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #505 from stephenh/volatile

Make Executor fields volatile since they're read from the thread pool.
parents fd53f2fc 4f421531
No related branches found
No related tags found
No related merge requests found
......@@ -16,66 +16,61 @@ import java.nio.ByteBuffer
/**
* The Mesos executor for Spark.
*/
private[spark] class Executor extends Logging {
var urlClassLoader : ExecutorURLClassLoader = null
var threadPool: ExecutorService = null
var env: SparkEnv = null
private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
initLogging()
def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
// Set spark.* system properties from executor arg
for ((key, value) <- properties) {
System.setProperty(key, value)
}
// Set spark.* system properties from executor arg
for ((key, value) <- properties) {
System.setProperty(key, value)
}
// Create our ClassLoader and set it on this thread
private val urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
// Create our ClassLoader and set it on this thread
urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
// Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM)
} else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
} catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
)
}
)
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
// Start worker thread pool
threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
}
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
......
......@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
private[spark] class MesosExecutorBackend(executor: Executor)
private[spark] class MesosExecutorBackend
extends MesosExecutor
with ExecutorBackend
with Logging {
var executor: Executor = null
var driver: ExecutorDriver = null
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
......@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
executor.initialize(
executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
properties
)
properties)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
val taskId = taskInfo.getTaskId.getValue.toLong
executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
if (executor == null) {
logError("Received launchTask but executor was null")
} else {
executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
}
}
override def error(d: ExecutorDriver, message: String) {
......@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
val runner = new MesosExecutorBackend(new Executor)
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
}
}
......@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
private[spark] class StandaloneExecutorBackend(
executor: Executor,
driverUrl: String,
executorId: String,
hostname: String,
......@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
with ExecutorBackend
with Logging {
var executor: Executor = null
var driver: ActorRef = null
override def preStart() {
......@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend(
override def receive = {
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
executor.initialize(executorId, hostname, sparkProperties)
executor = new Executor(executorId, hostname, sparkProperties)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
......@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend(
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
if (executor == null) {
logError("Received launchTask but executor was null")
System.exit(1)
} else {
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
......@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment