Skip to content
Snippets Groups Projects
Commit a8e2ba77 authored by Holden Karau's avatar Holden Karau Committed by Andrew Or
Browse files

[SPARK-13152][CORE] Fix task metrics deprecation warning

Make an internal non-deprecated version of incBytesRead and incRecordsRead so we don't have unecessary deprecation warnings in our build.

Right now incBytesRead and incRecordsRead are marked as deprecated and for internal use only. We should make private[spark] versions which are not deprecated and switch to those internally so as to not clutter up the warning messages when building.

cc andrewor14 who did the initial deprecation

Author: Holden Karau <holden@us.ibm.com>

Closes #11056 from holdenk/SPARK-13152-fix-task-metrics-deprecation-warnings.
parent de091452
No related branches found
No related tags found
No related merge requests found
......@@ -44,12 +44,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
existingMetrics.incBytesReadInternal(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
existingMetrics.incRecordsReadInternal(1)
delegate.next()
}
}
......
......@@ -81,10 +81,15 @@ class InputMetrics private (
*/
def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
// Once incBytesRead & intRecordsRead is ready to be removed from the public API
// we can remove the internal versions and make the previous public API private.
// This has been done to suppress warnings when building.
@deprecated("incrementing input metrics is for internal use only", "2.0.0")
def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incBytesReadInternal(v: Long): Unit = _bytesRead.add(v)
@deprecated("incrementing input metrics is for internal use only", "2.0.0")
def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def incRecordsReadInternal(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
_readMethod.setValue(v.toString)
......
......@@ -260,7 +260,7 @@ class HadoopRDD[K, V](
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
inputMetrics.incRecordsReadInternal(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
......@@ -292,7 +292,7 @@ class HadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
inputMetrics.incBytesReadInternal(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
......
......@@ -188,7 +188,7 @@ class NewHadoopRDD[K, V](
}
havePair = false
if (!finished) {
inputMetrics.incRecordsRead(1)
inputMetrics.incRecordsReadInternal(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
......@@ -219,7 +219,7 @@ class NewHadoopRDD[K, V](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
......
......@@ -811,8 +811,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
val inputMetrics = metrics.registerInputMetrics(readMethod)
inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
inputMetrics.incBytesReadInternal((inJson \ "Bytes Read").extract[Long])
inputMetrics.incRecordsReadInternal((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
}
// Updated blocks
......
......@@ -214,7 +214,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}
havePair = false
if (!finished) {
inputMetrics.incRecordsRead(1)
inputMetrics.incRecordsReadInternal(1)
}
if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
updateBytesRead()
......@@ -246,7 +246,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment