Skip to content
Snippets Groups Projects
Commit 302bb569 authored by Andrew Or's avatar Andrew Or Committed by Reynold Xin
Browse files

[SPARK-12884] Move classes to their own files for readability

This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just moves classes to their own files to avoid having single monolithic ones that contain 10 different classes.

Parent PR: #10717

Author: Andrew Or <andrew@databricks.com>

Closes #10810 from andrewor14/move-things.
parent 5e492e9d
No related branches found
No related tags found
No related merge requests found
......@@ -20,14 +20,12 @@ package org.apache.spark
import java.io.{ObjectInputStream, Serializable}
import scala.collection.generic.Growable
import scala.collection.Map
import scala.collection.mutable
import scala.ref.WeakReference
import scala.reflect.ClassTag
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
......@@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] (
override def toString: String = if (value_ == null) "null" else value_.toString
}
/**
* Helper object defining how to accumulate values of a particular type. An implicit
* AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
......@@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}
private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] {
......@@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
copy
}
}
/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged, i.e. variables that are only "added" to through an
* associative operation and can therefore be efficiently supported in parallel. They can be used
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
* value types, and programmers can add support for new types.
*
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
* However, they cannot read its value. Only the driver program can read the accumulator's value,
* using its value method.
*
* The interpreter session below shows an accumulator being used to add up the elements of an array:
*
* {{{
* scala> val accum = sc.accumulator(0)
* accum: spark.Accumulator[Int] = 0
*
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
* ...
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
*
* scala> accum.value
* res2: Int = 10
* }}}
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
}
def this(initialValue: T, param: AccumulatorParam[T]) = {
this(initialValue, param, None, false)
}
}
/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}
object AccumulatorParam {
// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
}
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private[spark] object Accumulators extends Logging {
/**
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
*/
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
private var lastId: Long = 0
def newId(): Long = synchronized {
lastId += 1
lastId
}
def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}
def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
}
// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
}
private[spark] object InternalAccumulator {
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
val TEST_ACCUMULATOR = "testAccumulator"
// For testing only.
// This needs to be a def since we don't want to reuse the same accumulator across stages.
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
if (sys.props.contains("spark.testing")) {
Some(new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
} else {
None
}
}
/**
* Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
val internalAccumulators = Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
internalAccumulators.foreach { accumulator =>
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
}
internalAccumulators
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import scala.collection.{mutable, Map}
import scala.ref.WeakReference
/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged, i.e. variables that are only "added" to through an
* associative operation and can therefore be efficiently supported in parallel. They can be used
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
* value types, and programmers can add support for new types.
*
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.
* However, they cannot read its value. Only the driver program can read the accumulator's value,
* using its value method.
*
* The interpreter session below shows an accumulator being used to add up the elements of an array:
*
* {{{
* scala> val accum = sc.accumulator(0)
* accum: spark.Accumulator[Int] = 0
*
* scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
* ...
* 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
*
* scala> accum.value
* res2: Int = 10
* }}}
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
}
def this(initialValue: T, param: AccumulatorParam[T]) = {
this(initialValue, param, None, false)
}
}
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private[spark] object Accumulators extends Logging {
/**
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
*/
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
private var lastId: Long = 0
def newId(): Long = synchronized {
lastId += 1
lastId
}
def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}
def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
}
// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
}
}
}
}
/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
* in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be
* available when you create Accumulators of a specific type.
*
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}
object AccumulatorParam {
// The following implicit objects were in SparkContext before 1.2 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects.
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0
}
implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
}
implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L
}
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f
}
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
// This is moved to its own file because many more things will be added to it in SPARK-10620.
private[spark] object InternalAccumulator {
val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
val TEST_ACCUMULATOR = "testAccumulator"
// For testing only.
// This needs to be a def since we don't want to reuse the same accumulator across stages.
private def maybeTestAccumulator: Option[Accumulator[Long]] = {
if (sys.props.contains("spark.testing")) {
Some(new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
} else {
None
}
}
/**
* Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
def create(sc: SparkContext): Seq[Accumulator[Long]] = {
val internalAccumulators = Seq(
// Execution memory refers to the memory used by internal data structures created
// during shuffles, aggregations and joins. The value of this accumulator should be
// approximately the sum of the peak sizes across all such data structures created
// in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
new Accumulator(
0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
) ++ maybeTestAccumulator.toSeq
internalAccumulators.foreach { accumulator =>
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
}
internalAccumulators
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Method by which input data was read. Network means that the data was read over the network
* from a remote block manager (which may have stored the data on-disk or in-memory).
*/
@DeveloperApi
object DataReadMethod extends Enumeration with Serializable {
type DataReadMethod = Value
val Memory, Disk, Hadoop, Network = Value
}
/**
* :: DeveloperApi ::
* Metrics about reading input data.
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
/**
* This is volatile so that it is visible to the updater thread.
*/
@volatile @transient var bytesReadCallback: Option[() => Long] = None
/**
* Total bytes read.
*/
private var _bytesRead: Long = _
def bytesRead: Long = _bytesRead
def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
/**
* Total records read.
*/
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
def incRecordsRead(records: Long): Unit = _recordsRead += records
/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
_bytesRead = c()
}
}
/**
* Register a function that can be called to get up-to-date information on how many bytes the task
* has read from an input source.
*/
def setBytesReadCallback(f: Option[() => Long]) {
bytesReadCallback = f
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Method by which output data was written.
*/
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
type DataWriteMethod = Value
val Hadoop = Value
}
/**
* :: DeveloperApi ::
* Metrics about writing output data.
*/
@DeveloperApi
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
private var _bytesWritten: Long = _
def bytesWritten: Long = _bytesWritten
private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
/**
* Total records written
*/
private var _recordsWritten: Long = 0L
def recordsWritten: Long = _recordsWritten
private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data read in a given task.
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Number of remote blocks fetched in this shuffle by this task
*/
private var _remoteBlocksFetched: Int = _
def remoteBlocksFetched: Int = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
/**
* Number of local blocks fetched in this shuffle by this task
*/
private var _localBlocksFetched: Int = _
def localBlocksFetched: Int = _localBlocksFetched
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.
*/
private var _fetchWaitTime: Long = _
def fetchWaitTime: Long = _fetchWaitTime
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
/**
* Total number of remote bytes read from the shuffle by this task
*/
private var _remoteBytesRead: Long = _
def remoteBytesRead: Long = _remoteBytesRead
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
private var _localBytesRead: Long = _
def localBytesRead: Long = _localBytesRead
private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
/**
* Total bytes fetched in the shuffle by this task (both remote and local).
*/
def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
/**
* Total number of records read from the shuffle by this task
*/
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data written in a given task.
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
@volatile private var _shuffleBytesWritten: Long = _
def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
/**
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
}
......@@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Metrics tracked during the execution of a task.
......@@ -241,6 +242,7 @@ class TaskMetrics extends Serializable {
}
}
private[spark] object TaskMetrics {
private val hostNameCache = new ConcurrentHashMap[String, String]()
......@@ -251,187 +253,3 @@ private[spark] object TaskMetrics {
if (canonicalHost != null) canonicalHost else host
}
}
/**
* :: DeveloperApi ::
* Method by which input data was read. Network means that the data was read over the network
* from a remote block manager (which may have stored the data on-disk or in-memory).
*/
@DeveloperApi
object DataReadMethod extends Enumeration with Serializable {
type DataReadMethod = Value
val Memory, Disk, Hadoop, Network = Value
}
/**
* :: DeveloperApi ::
* Method by which output data was written.
*/
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
type DataWriteMethod = Value
val Hadoop = Value
}
/**
* :: DeveloperApi ::
* Metrics about reading input data.
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {
/**
* This is volatile so that it is visible to the updater thread.
*/
@volatile @transient var bytesReadCallback: Option[() => Long] = None
/**
* Total bytes read.
*/
private var _bytesRead: Long = _
def bytesRead: Long = _bytesRead
def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
/**
* Total records read.
*/
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
def incRecordsRead(records: Long): Unit = _recordsRead += records
/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
_bytesRead = c()
}
}
/**
* Register a function that can be called to get up-to-date information on how many bytes the task
* has read from an input source.
*/
def setBytesReadCallback(f: Option[() => Long]) {
bytesReadCallback = f
}
}
/**
* :: DeveloperApi ::
* Metrics about writing output data.
*/
@DeveloperApi
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
/**
* Total bytes written
*/
private var _bytesWritten: Long = _
def bytesWritten: Long = _bytesWritten
private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
/**
* Total records written
*/
private var _recordsWritten: Long = 0L
def recordsWritten: Long = _recordsWritten
private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
}
/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data read in a given task.
*/
@DeveloperApi
class ShuffleReadMetrics extends Serializable {
/**
* Number of remote blocks fetched in this shuffle by this task
*/
private var _remoteBlocksFetched: Int = _
def remoteBlocksFetched: Int = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
/**
* Number of local blocks fetched in this shuffle by this task
*/
private var _localBlocksFetched: Int = _
def localBlocksFetched: Int = _localBlocksFetched
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
/**
* Time the task spent waiting for remote shuffle blocks. This only includes the time
* blocking on shuffle input data. For instance if block B is being fetched while the task is
* still not finished processing block A, it is not considered to be blocking on block B.
*/
private var _fetchWaitTime: Long = _
def fetchWaitTime: Long = _fetchWaitTime
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
/**
* Total number of remote bytes read from the shuffle by this task
*/
private var _remoteBytesRead: Long = _
def remoteBytesRead: Long = _remoteBytesRead
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
private var _localBytesRead: Long = _
def localBytesRead: Long = _localBytesRead
private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
/**
* Total bytes fetched in the shuffle by this task (both remote and local).
*/
def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
/**
* Total number of records read from the shuffle by this task
*/
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
}
/**
* :: DeveloperApi ::
* Metrics pertaining to shuffle data written in a given task.
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
@volatile private var _shuffleBytesWritten: Long = _
def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
/**
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment