From 302bb569f3e1f09e2e7cea5e4e7f5c6953b0fc82 Mon Sep 17 00:00:00 2001
From: Andrew Or <andrew@databricks.com>
Date: Mon, 18 Jan 2016 13:27:18 -0800
Subject: [PATCH] [SPARK-12884] Move classes to their own files for readability

This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just moves classes to their own files to avoid having single monolithic ones that contain 10 different classes.

Parent PR: #10717

Author: Andrew Or <andrew@databricks.com>

Closes #10810 from andrewor14/move-things.
---
 .../{Accumulators.scala => Accumulable.scala} | 179 +----------------
 .../scala/org/apache/spark/Accumulator.scala  | 160 +++++++++++++++
 .../apache/spark/InternalAccumulator.scala    |  58 ++++++
 .../apache/spark/executor/InputMetrics.scala  |  77 ++++++++
 .../apache/spark/executor/OutputMetrics.scala |  53 +++++
 .../spark/executor/ShuffleReadMetrics.scala   |  87 ++++++++
 .../spark/executor/ShuffleWriteMetrics.scala  |  53 +++++
 .../apache/spark/executor/TaskMetrics.scala   | 186 +-----------------
 8 files changed, 493 insertions(+), 360 deletions(-)
 rename core/src/main/scala/org/apache/spark/{Accumulators.scala => Accumulable.scala} (54%)
 create mode 100644 core/src/main/scala/org/apache/spark/Accumulator.scala
 create mode 100644 core/src/main/scala/org/apache/spark/InternalAccumulator.scala
 create mode 100644 core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
 create mode 100644 core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
 create mode 100644 core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
 create mode 100644 core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala

diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
similarity index 54%
rename from core/src/main/scala/org/apache/spark/Accumulators.scala
rename to core/src/main/scala/org/apache/spark/Accumulable.scala
index 5592b75afb..a456d420b8 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -20,14 +20,12 @@ package org.apache.spark
 import java.io.{ObjectInputStream, Serializable}
 
 import scala.collection.generic.Growable
-import scala.collection.Map
-import scala.collection.mutable
-import scala.ref.WeakReference
 import scala.reflect.ClassTag
 
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.util.Utils
 
+
 /**
  * A data type that can be accumulated, ie has an commutative and associative "add" operation,
  * but where the result type, `R`, may be different from the element type being added, `T`.
@@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] (
   override def toString: String = if (value_ == null) "null" else value_.toString
 }
 
+
 /**
  * Helper object defining how to accumulate values of a particular type. An implicit
  * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
@@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable {
   def zero(initialValue: R): R
 }
 
+
 private[spark] class
 GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
   extends AccumulableParam[R, T] {
@@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
     copy
   }
 }
-
-/**
- * A simpler value of [[Accumulable]] where the result type being accumulated is the same
- * as the types of elements being merged, i.e. variables that are only "added" to through an
- * associative operation and can therefore be efficiently supported in parallel. They can be used
- * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
- * value types, and programmers can add support for new types.
- *
- * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
- * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
- * However, they cannot read its value. Only the driver program can read the accumulator's value,
- * using its value method.
- *
- * The interpreter session below shows an accumulator being used to add up the elements of an array:
- *
- * {{{
- * scala> val accum = sc.accumulator(0)
- * accum: spark.Accumulator[Int] = 0
- *
- * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
- * ...
- * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
- *
- * scala> accum.value
- * res2: Int = 10
- * }}}
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `T`
- * @tparam T result type
- */
-class Accumulator[T] private[spark] (
-    @transient private[spark] val initialValue: T,
-    param: AccumulatorParam[T],
-    name: Option[String],
-    internal: Boolean)
-  extends Accumulable[T, T](initialValue, param, name, internal) {
-
-  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
-    this(initialValue, param, name, false)
-  }
-
-  def this(initialValue: T, param: AccumulatorParam[T]) = {
-    this(initialValue, param, None, false)
-  }
-}
-
-/**
- * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
- * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
- * available when you create Accumulators of a specific type.
- *
- * @tparam T type of value to accumulate
- */
-trait AccumulatorParam[T] extends AccumulableParam[T, T] {
-  def addAccumulator(t1: T, t2: T): T = {
-    addInPlace(t1, t2)
-  }
-}
-
-object AccumulatorParam {
-
-  // The following implicit objects were in SparkContext before 1.2 and users had to
-  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
-  // them automatically. However, as there are duplicate codes in SparkContext for backward
-  // compatibility, please update them accordingly if you modify the following implicit objects.
-
-  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
-    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
-    def zero(initialValue: Double): Double = 0.0
-  }
-
-  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
-    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
-    def zero(initialValue: Int): Int = 0
-  }
-
-  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
-    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
-    def zero(initialValue: Long): Long = 0L
-  }
-
-  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
-    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
-    def zero(initialValue: Float): Float = 0f
-  }
-
-  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
-}
-
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private[spark] object Accumulators extends Logging {
-  /**
-   * This global map holds the original accumulator objects that are created on the driver.
-   * It keeps weak references to these objects so that accumulators can be garbage-collected
-   * once the RDDs and user-code that reference them are cleaned up.
-   */
-  val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
-
-  private var lastId: Long = 0
-
-  def newId(): Long = synchronized {
-    lastId += 1
-    lastId
-  }
-
-  def register(a: Accumulable[_, _]): Unit = synchronized {
-    originals(a.id) = new WeakReference[Accumulable[_, _]](a)
-  }
-
-  def remove(accId: Long) {
-    synchronized {
-      originals.remove(accId)
-    }
-  }
-
-  // Add values to the original accumulators with some given IDs
-  def add(values: Map[Long, Any]): Unit = synchronized {
-    for ((id, value) <- values) {
-      if (originals.contains(id)) {
-        // Since we are now storing weak references, we must check whether the underlying data
-        // is valid.
-        originals(id).get match {
-          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
-          case None =>
-            throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
-        }
-      } else {
-        logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
-      }
-    }
-  }
-
-}
-
-private[spark] object InternalAccumulator {
-  val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
-  val TEST_ACCUMULATOR = "testAccumulator"
-
-  // For testing only.
-  // This needs to be a def since we don't want to reuse the same accumulator across stages.
-  private def maybeTestAccumulator: Option[Accumulator[Long]] = {
-    if (sys.props.contains("spark.testing")) {
-      Some(new Accumulator(
-        0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Accumulators for tracking internal metrics.
-   *
-   * These accumulators are created with the stage such that all tasks in the stage will
-   * add to the same set of accumulators. We do this to report the distribution of accumulator
-   * values across all tasks within each stage.
-   */
-  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
-    val internalAccumulators = Seq(
-        // Execution memory refers to the memory used by internal data structures created
-        // during shuffles, aggregations and joins. The value of this accumulator should be
-        // approximately the sum of the peak sizes across all such data structures created
-        // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
-        new Accumulator(
-          0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
-      ) ++ maybeTestAccumulator.toSeq
-    internalAccumulators.foreach { accumulator =>
-      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
-    }
-    internalAccumulators
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
new file mode 100644
index 0000000000..007136e6ae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.{mutable, Map}
+import scala.ref.WeakReference
+
+
+/**
+ * A simpler value of [[Accumulable]] where the result type being accumulated is the same
+ * as the types of elements being merged, i.e. variables that are only "added" to through an
+ * associative operation and can therefore be efficiently supported in parallel. They can be used
+ * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
+ * value types, and programmers can add support for new types.
+ *
+ * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
+ * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
+ * However, they cannot read its value. Only the driver program can read the accumulator's value,
+ * using its value method.
+ *
+ * The interpreter session below shows an accumulator being used to add up the elements of an array:
+ *
+ * {{{
+ * scala> val accum = sc.accumulator(0)
+ * accum: spark.Accumulator[Int] = 0
+ *
+ * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
+ * ...
+ * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
+ *
+ * scala> accum.value
+ * res2: Int = 10
+ * }}}
+ *
+ * @param initialValue initial value of accumulator
+ * @param param helper object defining how to add elements of type `T`
+ * @tparam T result type
+ */
+class Accumulator[T] private[spark] (
+    @transient private[spark] val initialValue: T,
+    param: AccumulatorParam[T],
+    name: Option[String],
+    internal: Boolean)
+  extends Accumulable[T, T](initialValue, param, name, internal) {
+
+  def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
+    this(initialValue, param, name, false)
+  }
+
+  def this(initialValue: T, param: AccumulatorParam[T]) = {
+    this(initialValue, param, None, false)
+  }
+}
+
+
+// TODO: The multi-thread support in accumulators is kind of lame; check
+// if there's a more intuitive way of doing it right
+private[spark] object Accumulators extends Logging {
+  /**
+   * This global map holds the original accumulator objects that are created on the driver.
+   * It keeps weak references to these objects so that accumulators can be garbage-collected
+   * once the RDDs and user-code that reference them are cleaned up.
+   */
+  val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
+
+  private var lastId: Long = 0
+
+  def newId(): Long = synchronized {
+    lastId += 1
+    lastId
+  }
+
+  def register(a: Accumulable[_, _]): Unit = synchronized {
+    originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+  }
+
+  def remove(accId: Long) {
+    synchronized {
+      originals.remove(accId)
+    }
+  }
+
+  // Add values to the original accumulators with some given IDs
+  def add(values: Map[Long, Any]): Unit = synchronized {
+    for ((id, value) <- values) {
+      if (originals.contains(id)) {
+        // Since we are now storing weak references, we must check whether the underlying data
+        // is valid.
+        originals(id).get match {
+          case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
+          case None =>
+            throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
+        }
+      } else {
+        logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
+      }
+    }
+  }
+
+}
+
+
+/**
+ * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
+ * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
+ * available when you create Accumulators of a specific type.
+ *
+ * @tparam T type of value to accumulate
+ */
+trait AccumulatorParam[T] extends AccumulableParam[T, T] {
+  def addAccumulator(t1: T, t2: T): T = {
+    addInPlace(t1, t2)
+  }
+}
+
+
+object AccumulatorParam {
+
+  // The following implicit objects were in SparkContext before 1.2 and users had to
+  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
+  // them automatically. However, as there are duplicate codes in SparkContext for backward
+  // compatibility, please update them accordingly if you modify the following implicit objects.
+
+  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
+    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
+    def zero(initialValue: Double): Double = 0.0
+  }
+
+  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
+    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
+    def zero(initialValue: Int): Int = 0
+  }
+
+  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
+    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
+    def zero(initialValue: Long): Long = 0L
+  }
+
+  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
+    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
+    def zero(initialValue: Float): Float = 0f
+  }
+
+  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
+}
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
new file mode 100644
index 0000000000..6ea997c079
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+
+// This is moved to its own file because many more things will be added to it in SPARK-10620.
+private[spark] object InternalAccumulator {
+  val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
+  val TEST_ACCUMULATOR = "testAccumulator"
+
+  // For testing only.
+  // This needs to be a def since we don't want to reuse the same accumulator across stages.
+  private def maybeTestAccumulator: Option[Accumulator[Long]] = {
+    if (sys.props.contains("spark.testing")) {
+      Some(new Accumulator(
+        0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Accumulators for tracking internal metrics.
+   *
+   * These accumulators are created with the stage such that all tasks in the stage will
+   * add to the same set of accumulators. We do this to report the distribution of accumulator
+   * values across all tasks within each stage.
+   */
+  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
+    val internalAccumulators = Seq(
+      // Execution memory refers to the memory used by internal data structures created
+      // during shuffles, aggregations and joins. The value of this accumulator should be
+      // approximately the sum of the peak sizes across all such data structures created
+      // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
+      new Accumulator(
+        0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
+    ) ++ maybeTestAccumulator.toSeq
+    internalAccumulators.foreach { accumulator =>
+      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+    }
+    internalAccumulators
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
new file mode 100644
index 0000000000..8f1d7f89a4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read.  Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+  type DataReadMethod = Value
+  val Memory, Disk, Hadoop, Network = Value
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+
+  /**
+   * This is volatile so that it is visible to the updater thread.
+   */
+  @volatile @transient var bytesReadCallback: Option[() => Long] = None
+
+  /**
+   * Total bytes read.
+   */
+  private var _bytesRead: Long = _
+  def bytesRead: Long = _bytesRead
+  def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
+
+  /**
+   * Total records read.
+   */
+  private var _recordsRead: Long = _
+  def recordsRead: Long = _recordsRead
+  def incRecordsRead(records: Long): Unit = _recordsRead += records
+
+  /**
+   * Invoke the bytesReadCallback and mutate bytesRead.
+   */
+  def updateBytesRead() {
+    bytesReadCallback.foreach { c =>
+      _bytesRead = c()
+    }
+  }
+
+  /**
+   * Register a function that can be called to get up-to-date information on how many bytes the task
+   * has read from an input source.
+   */
+  def setBytesReadCallback(f: Option[() => Long]) {
+    bytesReadCallback = f
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
new file mode 100644
index 0000000000..ad132d004c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Method by which output data was written.
+ */
+@DeveloperApi
+object DataWriteMethod extends Enumeration with Serializable {
+  type DataWriteMethod = Value
+  val Hadoop = Value
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about writing output data.
+ */
+@DeveloperApi
+case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+  /**
+   * Total bytes written
+   */
+  private var _bytesWritten: Long = _
+  def bytesWritten: Long = _bytesWritten
+  private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
+
+  /**
+   * Total records written
+   */
+  private var _recordsWritten: Long = 0L
+  def recordsWritten: Long = _recordsWritten
+  private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
new file mode 100644
index 0000000000..e985b35ace
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data read in a given task.
+ */
+@DeveloperApi
+class ShuffleReadMetrics extends Serializable {
+  /**
+   * Number of remote blocks fetched in this shuffle by this task
+   */
+  private var _remoteBlocksFetched: Int = _
+  def remoteBlocksFetched: Int = _remoteBlocksFetched
+  private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
+  private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+
+  /**
+   * Number of local blocks fetched in this shuffle by this task
+   */
+  private var _localBlocksFetched: Int = _
+  def localBlocksFetched: Int = _localBlocksFetched
+  private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
+  private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+
+  /**
+   * Time the task spent waiting for remote shuffle blocks. This only includes the time
+   * blocking on shuffle input data. For instance if block B is being fetched while the task is
+   * still not finished processing block A, it is not considered to be blocking on block B.
+   */
+  private var _fetchWaitTime: Long = _
+  def fetchWaitTime: Long = _fetchWaitTime
+  private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
+  private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+
+  /**
+   * Total number of remote bytes read from the shuffle by this task
+   */
+  private var _remoteBytesRead: Long = _
+  def remoteBytesRead: Long = _remoteBytesRead
+  private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
+  private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+
+  /**
+   * Shuffle data that was read from the local disk (as opposed to from a remote executor).
+   */
+  private var _localBytesRead: Long = _
+  def localBytesRead: Long = _localBytesRead
+  private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+
+  /**
+   * Total bytes fetched in the shuffle by this task (both remote and local).
+   */
+  def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
+
+  /**
+   * Number of blocks fetched in this shuffle by this task (remote or local)
+   */
+  def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
+
+  /**
+   * Total number of records read from the shuffle by this task
+   */
+  private var _recordsRead: Long = _
+  def recordsRead: Long = _recordsRead
+  private[spark] def incRecordsRead(value: Long) = _recordsRead += value
+  private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
new file mode 100644
index 0000000000..469ebe26c7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * Metrics pertaining to shuffle data written in a given task.
+ */
+@DeveloperApi
+class ShuffleWriteMetrics extends Serializable {
+  /**
+   * Number of bytes written for the shuffle by this task
+   */
+  @volatile private var _shuffleBytesWritten: Long = _
+  def shuffleBytesWritten: Long = _shuffleBytesWritten
+  private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
+  private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
+
+  /**
+   * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
+   */
+  @volatile private var _shuffleWriteTime: Long = _
+  def shuffleWriteTime: Long = _shuffleWriteTime
+  private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
+  private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
+
+  /**
+   * Total number of records written to the shuffle by this task
+   */
+  @volatile private var _shuffleRecordsWritten: Long = _
+  def shuffleRecordsWritten: Long = _shuffleRecordsWritten
+  private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
+  private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
+  private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
+}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 42207a9553..ce1fcbff71 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
 import org.apache.spark.storage.{BlockId, BlockStatus}
 import org.apache.spark.util.Utils
 
+
 /**
  * :: DeveloperApi ::
  * Metrics tracked during the execution of a task.
@@ -241,6 +242,7 @@ class TaskMetrics extends Serializable {
   }
 }
 
+
 private[spark] object TaskMetrics {
   private val hostNameCache = new ConcurrentHashMap[String, String]()
 
@@ -251,187 +253,3 @@ private[spark] object TaskMetrics {
     if (canonicalHost != null) canonicalHost else host
   }
 }
-
-/**
- * :: DeveloperApi ::
- * Method by which input data was read.  Network means that the data was read over the network
- * from a remote block manager (which may have stored the data on-disk or in-memory).
- */
-@DeveloperApi
-object DataReadMethod extends Enumeration with Serializable {
-  type DataReadMethod = Value
-  val Memory, Disk, Hadoop, Network = Value
-}
-
-/**
- * :: DeveloperApi ::
- * Method by which output data was written.
- */
-@DeveloperApi
-object DataWriteMethod extends Enumeration with Serializable {
-  type DataWriteMethod = Value
-  val Hadoop = Value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics about reading input data.
- */
-@DeveloperApi
-case class InputMetrics(readMethod: DataReadMethod.Value) {
-
-  /**
-   * This is volatile so that it is visible to the updater thread.
-   */
-  @volatile @transient var bytesReadCallback: Option[() => Long] = None
-
-  /**
-   * Total bytes read.
-   */
-  private var _bytesRead: Long = _
-  def bytesRead: Long = _bytesRead
-  def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
-
-  /**
-   * Total records read.
-   */
-  private var _recordsRead: Long = _
-  def recordsRead: Long = _recordsRead
-  def incRecordsRead(records: Long): Unit = _recordsRead += records
-
-  /**
-   * Invoke the bytesReadCallback and mutate bytesRead.
-   */
-  def updateBytesRead() {
-    bytesReadCallback.foreach { c =>
-      _bytesRead = c()
-    }
-  }
-
- /**
-  * Register a function that can be called to get up-to-date information on how many bytes the task
-  * has read from an input source.
-  */
-  def setBytesReadCallback(f: Option[() => Long]) {
-    bytesReadCallback = f
-  }
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics about writing output data.
- */
-@DeveloperApi
-case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
-  /**
-   * Total bytes written
-   */
-  private var _bytesWritten: Long = _
-  def bytesWritten: Long = _bytesWritten
-  private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
-
-  /**
-   * Total records written
-   */
-  private var _recordsWritten: Long = 0L
-  def recordsWritten: Long = _recordsWritten
-  private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics pertaining to shuffle data read in a given task.
- */
-@DeveloperApi
-class ShuffleReadMetrics extends Serializable {
-  /**
-   * Number of remote blocks fetched in this shuffle by this task
-   */
-  private var _remoteBlocksFetched: Int = _
-  def remoteBlocksFetched: Int = _remoteBlocksFetched
-  private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
-  private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
-
-  /**
-   * Number of local blocks fetched in this shuffle by this task
-   */
-  private var _localBlocksFetched: Int = _
-  def localBlocksFetched: Int = _localBlocksFetched
-  private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
-  private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
-
-  /**
-   * Time the task spent waiting for remote shuffle blocks. This only includes the time
-   * blocking on shuffle input data. For instance if block B is being fetched while the task is
-   * still not finished processing block A, it is not considered to be blocking on block B.
-   */
-  private var _fetchWaitTime: Long = _
-  def fetchWaitTime: Long = _fetchWaitTime
-  private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
-  private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
-
-  /**
-   * Total number of remote bytes read from the shuffle by this task
-   */
-  private var _remoteBytesRead: Long = _
-  def remoteBytesRead: Long = _remoteBytesRead
-  private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
-  private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
-
-  /**
-   * Shuffle data that was read from the local disk (as opposed to from a remote executor).
-   */
-  private var _localBytesRead: Long = _
-  def localBytesRead: Long = _localBytesRead
-  private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
-
-  /**
-   * Total bytes fetched in the shuffle by this task (both remote and local).
-   */
-  def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
-
-  /**
-   * Number of blocks fetched in this shuffle by this task (remote or local)
-   */
-  def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
-
-  /**
-   * Total number of records read from the shuffle by this task
-   */
-  private var _recordsRead: Long = _
-  def recordsRead: Long = _recordsRead
-  private[spark] def incRecordsRead(value: Long) = _recordsRead += value
-  private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
-}
-
-/**
- * :: DeveloperApi ::
- * Metrics pertaining to shuffle data written in a given task.
- */
-@DeveloperApi
-class ShuffleWriteMetrics extends Serializable {
-  /**
-   * Number of bytes written for the shuffle by this task
-   */
-  @volatile private var _shuffleBytesWritten: Long = _
-  def shuffleBytesWritten: Long = _shuffleBytesWritten
-  private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
-  private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
-
-  /**
-   * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
-   */
-  @volatile private var _shuffleWriteTime: Long = _
-  def shuffleWriteTime: Long = _shuffleWriteTime
-  private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
-  private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
-
-  /**
-   * Total number of records written to the shuffle by this task
-   */
-  @volatile private var _shuffleRecordsWritten: Long = _
-  def shuffleRecordsWritten: Long = _shuffleRecordsWritten
-  private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
-  private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
-  private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
-}
-- 
GitLab