Skip to content
Snippets Groups Projects
Commit 9f1f9b10 authored by jerryshao's avatar jerryshao Committed by Andrew Or
Browse files

[SPARK-7007] [CORE] Add a metric source for ExecutorAllocationManager

Add a metric source to expose the internal status of ExecutorAllocationManager to better monitoring the resource usage of executors when dynamic allocation is enable. Please help to review, thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes #5589 from jerryshao/dynamic-allocation-source and squashes the following commits:

104d155 [jerryshao] rebase and address the comments
c501a2c [jerryshao] Address the comments
d237ba5 [jerryshao] Address the comments
2c3540f [jerryshao] Add a metric source for ExecutorAllocationManager
parent 57e9f29e
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit ...@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable import scala.collection.mutable
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.metrics.source.Source
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/** /**
...@@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager( ...@@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
private val executor = private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
val executorAllocationManagerSource = new ExecutorAllocationManagerSource
/** /**
* Verify that the settings specified through the config are valid. * Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception. * If not, throw an appropriate exception.
...@@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager( ...@@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
} }
} }
/**
* Metric source for ExecutorAllocationManager to expose its internal executor allocation
* status to MetricsSystem.
* Note: These metrics heavily rely on the internal implementation of
* ExecutorAllocationManager, metrics or value of metrics will be changed when internal
* implementation is changed, so these metrics are not stable across Spark version.
*/
private[spark] class ExecutorAllocationManagerSource extends Source {
val sourceName = "ExecutorAllocationManager"
val metricRegistry = new MetricRegistry()
private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
})
}
registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
registerGauge("numberAllExecutors", executorIds.size, 0)
registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
}
} }
private object ExecutorAllocationManager { private object ExecutorAllocationManager {
......
...@@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook() _taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler)) _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
// Make sure the context is stopped if the user forgets about it. This avoids leaving // Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment