Skip to content
Snippets Groups Projects
Commit 871bc168 authored by jerryshao's avatar jerryshao
Browse files

Add Executor instrumentation

parent 576528f0
No related branches found
No related tags found
No related merge requests found
package spark.executor
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
class ExecutorInstrumentation(val executor: Option[Executor]) extends Source{
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"),
new Gauge[Int] {
override def getValue: Int = executor.map(_.threadPool.getActiveCount()).getOrElse(0)
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
new Gauge[Long] {
override def getValue: Long = executor.map(_.threadPool.getCompletedTaskCount()).getOrElse(0)
})
// Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"),
new Gauge[Int] {
override def getValue: Int = executor.map(_.threadPool.getPoolSize()).getOrElse(0)
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"),
new Gauge[Int] {
override def getValue: Int = executor.map(_.threadPool.getMaximumPoolSize()).getOrElse(0)
})
}
\ No newline at end of file
......@@ -24,6 +24,7 @@ import spark.TaskState.TaskState
import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
import spark.metrics.MetricsSystem
private[spark] class MesosExecutorBackend
extends MesosExecutor
......@@ -32,6 +33,9 @@ private[spark] class MesosExecutorBackend
var executor: Executor = null
var driver: ExecutorDriver = null
val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
MesosExecutorBackend.metricsSystem.start()
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
......@@ -79,13 +83,17 @@ private[spark] class MesosExecutorBackend
override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
override def shutdown(d: ExecutorDriver) {}
override def shutdown(d: ExecutorDriver) {
MesosExecutorBackend.metricsSystem.stop()
}
}
/**
* Entry point for Mesos executor.
*/
private[spark] object MesosExecutorBackend {
private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
......
......@@ -24,6 +24,7 @@ import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import spark.metrics.MetricsSystem
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
......@@ -45,6 +46,8 @@ private[spark] class StandaloneExecutorBackend(
var executor: Executor = null
var driver: ActorRef = null
val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
......@@ -52,6 +55,9 @@ private[spark] class StandaloneExecutorBackend(
driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(driver) // Doesn't work with remote actors, but useful for testing
StandaloneExecutorBackend.metricsSystem.registerSource(executorInstrumentation)
StandaloneExecutorBackend.metricsSystem.start()
}
override def receive = {
......@@ -81,9 +87,15 @@ private[spark] class StandaloneExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
override def postStop() {
StandaloneExecutorBackend.metricsSystem.stop()
}
}
private[spark] object StandaloneExecutorBackend {
private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment