Skip to content
Snippets Groups Projects
Commit 81720e13 authored by Reynold Xin's avatar Reynold Xin
Browse files

Moved all StandaloneClusterMessage's into StandaloneClusterMessages object.

parent 23b5da14
No related branches found
No related tags found
No related merge requests found
...@@ -18,19 +18,16 @@ ...@@ -18,19 +18,16 @@
package spark.executor package spark.executor
import java.nio.ByteBuffer import java.nio.ByteBuffer
import spark.Logging
import spark.TaskState.TaskState
import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props, Terminated} import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import spark.scheduler.cluster._ import spark.{Logging, Utils}
import spark.scheduler.cluster.RegisteredExecutor import spark.TaskState.TaskState
import spark.scheduler.cluster.LaunchTask
import spark.scheduler.cluster.RegisterExecutorFailed
import spark.scheduler.cluster.RegisterExecutor
import spark.Utils
import spark.deploy.SparkHadoopUtil import spark.deploy.SparkHadoopUtil
import spark.scheduler.cluster.StandaloneClusterMessages._
import spark.util.AkkaUtils
private[spark] class StandaloneExecutorBackend( private[spark] class StandaloneExecutorBackend(
driverUrl: String, driverUrl: String,
......
...@@ -17,46 +17,47 @@ ...@@ -17,46 +17,47 @@
package spark.scheduler.cluster package spark.scheduler.cluster
import spark.TaskState.TaskState
import java.nio.ByteBuffer import java.nio.ByteBuffer
import spark.util.SerializableBuffer
import spark.TaskState.TaskState
import spark.Utils import spark.Utils
import spark.util.SerializableBuffer
private[spark] sealed trait StandaloneClusterMessage extends Serializable private[spark] sealed trait StandaloneClusterMessage extends Serializable
// Driver to executors private[spark] object StandaloneClusterMessages {
private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark] // Driver to executors
case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
extends StandaloneClusterMessage
private[spark] case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage extends StandaloneClusterMessage
// Executors to driver case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
private[spark]
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
extends StandaloneClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
private[spark] // Executors to driver
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
extends StandaloneClusterMessage extends StandaloneClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
data: SerializableBuffer) extends StandaloneClusterMessage
private[spark] object StatusUpdate {
object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */
/** Alternate factory method that takes a ByteBuffer directly for the data field */ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { : StatusUpdate = {
StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
}
} }
}
// Internal messages in driver // Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage case object ReviveOffers extends StandaloneClusterMessage
private[spark] case object StopDriver extends StandaloneClusterMessage
private[spark] case class RemoveExecutor(executorId: String, reason: String) case object StopDriver extends StandaloneClusterMessage
extends StandaloneClusterMessage
case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
}
...@@ -17,17 +17,18 @@ ...@@ -17,17 +17,18 @@
package spark.scheduler.cluster package spark.scheduler.cluster
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._ import akka.actor._
import akka.util.duration._ import akka.dispatch.Await
import akka.pattern.ask import akka.pattern.ask
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.util.Duration import akka.util.Duration
import spark.{Utils, SparkException, Logging, TaskState} import spark.{Utils, SparkException, Logging, TaskState}
import akka.dispatch.Await import spark.scheduler.cluster.StandaloneClusterMessages._
import java.util.concurrent.atomic.AtomicInteger
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
/** /**
* A standalone scheduler backend, which waits for standalone executors to connect to it through * A standalone scheduler backend, which waits for standalone executors to connect to it through
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment