From 81720e13fc9e1f475dd1333babfa08f3f806a5d0 Mon Sep 17 00:00:00 2001 From: Reynold Xin <reynoldx@gmail.com> Date: Mon, 29 Jul 2013 17:53:01 -0700 Subject: [PATCH] Moved all StandaloneClusterMessage's into StandaloneClusterMessages object. --- .../executor/StandaloneExecutorBackend.scala | 17 +++--- .../cluster/StandaloneClusterMessage.scala | 61 ++++++++++--------- .../cluster/StandaloneSchedulerBackend.scala | 9 +-- 3 files changed, 43 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index f4003da732..e47fe50021 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -18,19 +18,16 @@ package spark.executor import java.nio.ByteBuffer -import spark.Logging -import spark.TaskState.TaskState -import spark.util.AkkaUtils + import akka.actor.{ActorRef, Actor, Props, Terminated} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} -import spark.scheduler.cluster._ -import spark.scheduler.cluster.RegisteredExecutor -import spark.scheduler.cluster.LaunchTask -import spark.scheduler.cluster.RegisterExecutorFailed -import spark.scheduler.cluster.RegisterExecutor -import spark.Utils + +import spark.{Logging, Utils} +import spark.TaskState.TaskState import spark.deploy.SparkHadoopUtil +import spark.scheduler.cluster.StandaloneClusterMessages._ +import spark.util.AkkaUtils + private[spark] class StandaloneExecutorBackend( driverUrl: String, diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index ac9e5ef94d..05c29eb72f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -17,46 +17,47 @@ package spark.scheduler.cluster -import spark.TaskState.TaskState import java.nio.ByteBuffer -import spark.util.SerializableBuffer + +import spark.TaskState.TaskState import spark.Utils +import spark.util.SerializableBuffer + private[spark] sealed trait StandaloneClusterMessage extends Serializable -// Driver to executors -private[spark] -case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage +private[spark] object StandaloneClusterMessages { -private[spark] -case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) - extends StandaloneClusterMessage + // Driver to executors + case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage -private[spark] -case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage + case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) + extends StandaloneClusterMessage -// Executors to driver -private[spark] -case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) - extends StandaloneClusterMessage { - Utils.checkHostPort(hostPort, "Expected host port") -} + case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage -private[spark] -case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) - extends StandaloneClusterMessage + // Executors to driver + case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) + extends StandaloneClusterMessage { + Utils.checkHostPort(hostPort, "Expected host port") + } + + case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, + data: SerializableBuffer) extends StandaloneClusterMessage -private[spark] -object StatusUpdate { - /** Alternate factory method that takes a ByteBuffer directly for the data field */ - def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { - StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + object StatusUpdate { + /** Alternate factory method that takes a ByteBuffer directly for the data field */ + def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer) + : StatusUpdate = { + StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + } } -} -// Internal messages in driver -private[spark] case object ReviveOffers extends StandaloneClusterMessage -private[spark] case object StopDriver extends StandaloneClusterMessage + // Internal messages in driver + case object ReviveOffers extends StandaloneClusterMessage -private[spark] case class RemoveExecutor(executorId: String, reason: String) - extends StandaloneClusterMessage + case object StopDriver extends StandaloneClusterMessage + + case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage + +} diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 03a64e0192..075a7cbf7e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -17,17 +17,18 @@ package spark.scheduler.cluster +import java.util.concurrent.atomic.AtomicInteger + import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ -import akka.util.duration._ +import akka.dispatch.Await import akka.pattern.ask +import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} import akka.util.Duration import spark.{Utils, SparkException, Logging, TaskState} -import akka.dispatch.Await -import java.util.concurrent.atomic.AtomicInteger -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import spark.scheduler.cluster.StandaloneClusterMessages._ /** * A standalone scheduler backend, which waits for standalone executors to connect to it through -- GitLab