Skip to content
Snippets Groups Projects
Commit 12252d1d authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-13071] Coalescing HadoopRDD overwrites existing input metrics

This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g.
```
sc.textFile(..., 4).coalesce(2).count()
```
we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`.

For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark.

This was caused by #10835.

Author: Andrew Or <andrew@databricks.com>

Closes #10973 from andrewor14/fix-input-metrics-coalesce.
parent 70e69fc4
No related branches found
No related tags found
No related merge requests found
......@@ -215,6 +215,7 @@ class HadoopRDD[K, V](
// TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.inputSplit.value match {
......@@ -230,9 +231,13 @@ class HadoopRDD[K, V](
case _ => None
}
// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(getBytesRead())
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
......
......@@ -130,6 +130,7 @@ class NewHadoopRDD[K, V](
val conf = getConf
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
......@@ -139,9 +140,13 @@ class NewHadoopRDD[K, V](
case _ => None
}
// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(getBytesRead())
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
......
......@@ -127,6 +127,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
val conf = getConf(isDriverSide = false)
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead
// Sets the thread local variable for the file's name
split.serializableHadoopSplit.value match {
......@@ -142,9 +143,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
case _ => None
}
// For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
// task and in the same thread, in which case we need to avoid override values written by
// previous partitions (SPARK-13071).
def updateBytesRead(): Unit = {
getBytesReadCallback.foreach { getBytesRead =>
inputMetrics.setBytesRead(getBytesRead())
inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment