Skip to content
Snippets Groups Projects
Commit 188abbf8 authored by Grace Huang's avatar Grace Huang
Browse files

Revert "SPARK-900 Use coarser grained naming for metrics"

This reverts commit 4b68be5f.
parent a2af6b54
No related branches found
No related tags found
No related merge requests found
...@@ -20,7 +20,6 @@ package org.apache.spark.deploy.master ...@@ -20,7 +20,6 @@ package org.apache.spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry} import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source import org.apache.spark.metrics.source.Source
import org.apache.spark.util.NamingConventions
class ApplicationSource(val application: ApplicationInfo) extends Source { class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
...@@ -31,11 +30,11 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { ...@@ -31,11 +30,11 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
override def getValue: String = application.state.toString override def getValue: String = application.state.toString
}) })
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] { metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
override def getValue: Long = application.duration override def getValue: Long = application.duration
}) })
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
override def getValue: Int = application.coresGranted override def getValue: Int = application.coresGranted
}) })
......
...@@ -20,24 +20,23 @@ package org.apache.spark.deploy.master ...@@ -20,24 +20,23 @@ package org.apache.spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry} import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source import org.apache.spark.metrics.source.Source
import org.apache.spark.util.NamingConventions
private[spark] class MasterSource(val master: Master) extends Source { private[spark] class MasterSource(val master: Master) extends Source {
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
val sourceName = "master" val sourceName = "master"
// Gauge for worker numbers in cluster // Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
override def getValue: Int = master.workers.size override def getValue: Int = master.workers.size
}) })
// Gauge for application numbers in cluster // Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
override def getValue: Int = master.apps.size override def getValue: Int = master.apps.size
}) })
// Gauge for waiting application numbers in cluster // Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size override def getValue: Int = master.waitingApps.size
}) })
} }
...@@ -20,33 +20,32 @@ package org.apache.spark.deploy.worker ...@@ -20,33 +20,32 @@ package org.apache.spark.deploy.worker
import com.codahale.metrics.{Gauge, MetricRegistry} import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source import org.apache.spark.metrics.source.Source
import org.apache.spark.util.NamingConventions
private[spark] class WorkerSource(val worker: Worker) extends Source { private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker" val sourceName = "worker"
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
override def getValue: Int = worker.executors.size override def getValue: Int = worker.executors.size
}) })
// Gauge for cores used of this worker // Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed override def getValue: Int = worker.coresUsed
}) })
// Gauge for memory used of this worker // Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed override def getValue: Int = worker.memoryUsed
}) })
// Gauge for cores free of this worker // Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresFree override def getValue: Int = worker.coresFree
}) })
// Gauge for memory free of this worker // Gauge for memory free of this worker
metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree override def getValue: Int = worker.memoryFree
}) })
} }
...@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.LocalFileSystem ...@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.LocalFileSystem
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source import org.apache.spark.metrics.source.Source
import org.apache.spark.util.NamingConventions
class ExecutorSource(val executor: Executor, executorId: String) extends Source { class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] = private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
...@@ -44,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source ...@@ -44,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
val sourceName = "executor.%s".format(executorId) val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts // Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount() override def getValue: Int = executor.threadPool.getActiveCount()
}) })
// Gauge for executor thread pool's approximate total number of tasks that have been completed // Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] { metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount() override def getValue: Long = executor.threadPool.getCompletedTaskCount()
}) })
// Gauge for executor thread pool's current number of threads // Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize() override def getValue: Int = executor.threadPool.getPoolSize()
}) })
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize() override def getValue: Int = executor.threadPool.getMaximumPoolSize()
}) })
// Gauge for file system stats of this executor // Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) { for (scheme <- Array("hdfs", "file")) {
registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L) registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L) registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0) registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0) registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0) registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
} }
} }
...@@ -21,30 +21,29 @@ import com.codahale.metrics.{Gauge,MetricRegistry} ...@@ -21,30 +21,29 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.util.NamingConventions
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
extends Source { extends Source {
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
val sourceName = "%s.DAGScheduler".format(sc.appName) val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size override def getValue: Int = dagScheduler.failed.size
}) })
metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size override def getValue: Int = dagScheduler.running.size
}) })
metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size override def getValue: Int = dagScheduler.waiting.size
}) })
metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextJobId.get() override def getValue: Int = dagScheduler.nextJobId.get()
}) })
metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] { metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size override def getValue: Int = dagScheduler.activeJobs.size
}) })
} }
...@@ -21,7 +21,6 @@ import com.codahale.metrics.{Gauge,MetricRegistry} ...@@ -21,7 +21,6 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext import org.apache.spark.SparkContext
import org.apache.spark.util.NamingConventions
private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
...@@ -29,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar ...@@ -29,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
val metricRegistry = new MetricRegistry() val metricRegistry = new MetricRegistry()
val sourceName = "%s.BlockManager".format(sc.appName) val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] { metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = { override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
...@@ -37,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar ...@@ -37,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
} }
}) })
metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] { metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = { override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
...@@ -45,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar ...@@ -45,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
} }
}) })
metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] { metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
override def getValue: Long = { override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
...@@ -54,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar ...@@ -54,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
} }
}) })
metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
override def getValue: Long = { override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList val diskSpaceUsed = storageStatusList
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
/**
* all utilities related to naming conventions
*/
private[spark] object NamingConventions {
/**
* Lower camelCase which convert the phrases into camelCase style with the first letter lowercase
*/
def lowerCamelCaseConversion(phrases: Seq[String]): Seq[String] = {
var first = true
for (elem <- phrases) yield {
if (first) {
first = false
elem
}
else {
elem.capitalize
}
}
}
/**
* The standard camelCase style
*/
def camelCaseConversion(phrases: Seq[String]): Seq[String] = {
phrases.map(_.capitalize)
}
def noConversion = { x: Seq[String] => x }
/**
* Concatenate the words using certain naming style.
* The default style is lowerCamelCase with empty connector.
*/
def makeIdentifier(phrases: Seq[String], namingConversion: (Seq[String]) => Seq[String] = lowerCamelCaseConversion) (implicit connector: String = "" ): String = {
namingConversion(phrases.filter(_.nonEmpty)).mkString(connector)
}
def makeMetricName(phrases: String*): String = {
makeIdentifier(phrases, noConversion)("_")
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment