Skip to content
Snippets Groups Projects
Commit 7d2eada4 authored by Andrew xia's avatar Andrew xia Committed by jerryshao
Browse files

Add metrics source of DAGScheduler and blockManager

Conflicts:

	core/src/main/scala/spark/SparkContext.scala
	core/src/main/scala/spark/SparkEnv.scala
parent e9ac8875
No related branches found
No related tags found
No related merge requests found
......@@ -60,11 +60,11 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI}
......@@ -270,6 +270,15 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
def initDriverMetrics() = {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
......
......@@ -30,6 +30,7 @@ import spark.network.ConnectionManager
import spark.serializer.{Serializer, SerializerManager}
import spark.util.AkkaUtils
import spark.api.python.PythonWorkerFactory
import spark.metrics._
/**
......@@ -53,6 +54,7 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
// To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort
......@@ -184,6 +186,9 @@ object SparkEnv extends Logging {
httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
val metricsSystem = MetricsSystem.createMetricsSystem("driver")
metricsSystem.start()
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
......@@ -213,6 +218,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
metricsSystem,
None)
}
}
......@@ -30,7 +30,7 @@ import spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialRe
import spark.scheduler.cluster.TaskInfo
import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
import spark.metrics.MetricsSystem
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
* each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
......@@ -126,7 +126,6 @@ class DAGScheduler(
val resultStageToJob = new HashMap[Stage, ActiveJob]
val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
// Start a thread to run the DAGScheduler event loop
def start() {
new Thread("DAGScheduler") {
......
package spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage","failedStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
metricRegistry.register(MetricRegistry.name("stage","runningStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
metricRegistry.register(MetricRegistry.name("stage","waitingStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
metricRegistry.register(MetricRegistry.name("job","allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
})
metricRegistry.register(MetricRegistry.name("job","ActiveJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
package spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
import spark.storage._
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
metricRegistry.register(MetricRegistry.name("memory","maxMem"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
maxMem
}
})
metricRegistry.register(MetricRegistry.name("memory","remainingMem"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
remainingMem
}
})
metricRegistry.register(MetricRegistry.name("disk","diskSpaceUsed"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
diskSpaceUsed
}
})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment