From 188abbf8f1765d3f52eff8b852309352630ab8fb Mon Sep 17 00:00:00 2001
From: Grace Huang <jie.huang@intel.com>
Date: Tue, 8 Oct 2013 17:45:14 +0800
Subject: [PATCH] Revert "SPARK-900 Use coarser grained naming for metrics"

This reverts commit 4b68be5f3c0a251453c184b233b3ca490812dafd.
---
 .../deploy/master/ApplicationSource.scala     |  5 +-
 .../spark/deploy/master/MasterSource.scala    |  7 +--
 .../spark/deploy/worker/WorkerSource.scala    | 11 ++--
 .../spark/executor/ExecutorSource.scala       | 19 +++---
 .../spark/scheduler/DAGSchedulerSource.scala  | 11 ++--
 .../spark/storage/BlockManagerSource.scala    |  9 ++-
 .../apache/spark/util/NamingConventions.scala | 62 -------------------
 7 files changed, 28 insertions(+), 96 deletions(-)
 delete mode 100644 core/src/main/scala/org/apache/spark/util/NamingConventions.scala

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index c72322e9c2..5a24042e14 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy.master
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.NamingConventions
 
 class ApplicationSource(val application: ApplicationInfo) extends Source {
   val metricRegistry = new MetricRegistry()
@@ -31,11 +30,11 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
     override def getValue: String = application.state.toString
   })
 
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
     override def getValue: Long = application.duration
   })
 
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
     override def getValue: Int = application.coresGranted
   })
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index de3939836b..23d1cb77da 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -20,24 +20,23 @@ package org.apache.spark.deploy.master
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.NamingConventions
 
 private[spark] class MasterSource(val master: Master) extends Source {
   val metricRegistry = new MetricRegistry()
   val sourceName = "master"
 
   // Gauge for worker numbers in cluster
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
     override def getValue: Int = master.workers.size
   })
 
   // Gauge for application numbers in cluster
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
     override def getValue: Int = master.apps.size
   })
 
   // Gauge for waiting application numbers in cluster
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
     override def getValue: Int = master.waitingApps.size
   })
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index fc4f4ae99c..df269fd047 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -20,33 +20,32 @@ package org.apache.spark.deploy.worker
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.NamingConventions
 
 private[spark] class WorkerSource(val worker: Worker) extends Source {
   val sourceName = "worker"
   val metricRegistry = new MetricRegistry()
 
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
     override def getValue: Int = worker.executors.size
   })
 
   // Gauge for cores used of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
     override def getValue: Int = worker.coresUsed
   })
 
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
     override def getValue: Int = worker.memoryUsed
   })
 
   // Gauge for cores free of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
     override def getValue: Int = worker.coresFree
   })
 
   // Gauge for memory free of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
     override def getValue: Int = worker.memoryFree
   })
 }
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 6cbd154603..18c9dc1c0a 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.LocalFileSystem
 import scala.collection.JavaConversions._
 
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.NamingConventions
 
 class ExecutorSource(val executor: Executor, executorId: String) extends Source {
   private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
@@ -44,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
   val sourceName = "executor.%s".format(executorId)
 
   // Gauge for executor thread pool's actively executing task counts
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getActiveCount()
   })
 
   // Gauge for executor thread pool's approximate total number of tasks that have been completed
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
     override def getValue: Long = executor.threadPool.getCompletedTaskCount()
   })
 
   // Gauge for executor thread pool's current number of threads
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getPoolSize()
   })
 
   // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getMaximumPoolSize()
   })
 
   // Gauge for file system stats of this executor
   for (scheme <- Array("hdfs", "file")) {
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0)
+    registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
+    registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
+    registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
+    registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
+    registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 9e90a08411..446d490cc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -21,30 +21,29 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.SparkContext
-import org.apache.spark.util.NamingConventions
 
 private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
     extends Source {
   val metricRegistry = new MetricRegistry()
   val sourceName = "%s.DAGScheduler".format(sc.appName)
 
-  metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.failed.size
   })
 
-  metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.running.size
   })
 
-  metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.waiting.size
   })
 
-  metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.nextJobId.get()
   })
 
-  metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.activeJobs.size
   })
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 4312250240..acc3951088 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -21,7 +21,6 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
 
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.SparkContext
-import org.apache.spark.util.NamingConventions
 
 
 private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
@@ -29,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
   val metricRegistry = new MetricRegistry()
   val sourceName = "%s.BlockManager".format(sc.appName)
 
-  metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -37,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
@@ -45,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -54,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val diskSpaceUsed = storageStatusList
diff --git a/core/src/main/scala/org/apache/spark/util/NamingConventions.scala b/core/src/main/scala/org/apache/spark/util/NamingConventions.scala
deleted file mode 100644
index 7263361beb..0000000000
--- a/core/src/main/scala/org/apache/spark/util/NamingConventions.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-/**
- * all utilities related to naming conventions 
- */
-private[spark] object NamingConventions {
-  
-  /**
-   * Lower camelCase which convert the phrases into camelCase style with the first letter lowercase
-   */
-  def lowerCamelCaseConversion(phrases: Seq[String]): Seq[String] = {
-    var first = true
-    
-    for (elem <- phrases) yield {
-      if (first) {
-        first = false
-        elem
-      }
-      else {
-        elem.capitalize
-      }
-    }
-  }
-  
-  /**
-   * The standard camelCase style
-   */
-  def camelCaseConversion(phrases: Seq[String]): Seq[String] = {
-    phrases.map(_.capitalize)
-  }
-  
-  def noConversion = { x: Seq[String] => x }
-  
-  /**
-   * Concatenate the words using certain naming style.
-   * The default style is lowerCamelCase with empty connector.
-   */
-  def makeIdentifier(phrases: Seq[String], namingConversion: (Seq[String]) => Seq[String] = lowerCamelCaseConversion) (implicit connector: String = "" ): String = {
-    namingConversion(phrases.filter(_.nonEmpty)).mkString(connector)
-  }
-  
-  def makeMetricName(phrases: String*): String = {
-    makeIdentifier(phrases, noConversion)("_")
-  }
-}
\ No newline at end of file
-- 
GitLab