Skip to content
Snippets Groups Projects
Commit ec30188a authored by Imran Rashid's avatar Imran Rashid
Browse files

rename remoteFetchWaitTime to fetchWaitTime, since it also includes time from local fetches

parent 9f0dc829
No related branches found
No related tags found
No related merge requests found
...@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin ...@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val shuffleMetrics = new ShuffleReadMetrics val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
shuffleMetrics.remoteFetchWaitTime = itr.remoteFetchWaitTime shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
shuffleMetrics.remoteBytesRead = itr.remoteBytesRead shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = itr.totalBlocks shuffleMetrics.totalBlocksFetched = itr.totalBlocks
shuffleMetrics.localBlocksFetched = itr.numLocalBlocks shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
......
...@@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable { ...@@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable {
var shuffleReadMillis: Long = _ var shuffleReadMillis: Long = _
/** /**
* Total time that is spent blocked waiting for shuffle to fetch remote data * Total time that is spent blocked waiting for shuffle to fetch data
*/ */
var remoteFetchWaitTime: Long = _ var fetchWaitTime: Long = _
/** /**
* The total amount of time for all the shuffle fetches. This adds up time from overlapping * The total amount of time for all the shuffle fetches. This adds up time from overlapping
......
...@@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging { ...@@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging {
showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
//fetch & io //fetch & io
showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.remoteFetchWaitTime}) showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead}) showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize)) showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
...@@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe ...@@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe
object RuntimePercentage { object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.remoteFetchWaitTime} val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
val fetch = fetchTime.map{_ / denom} val fetch = fetchTime.map{_ / denom}
val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d)) val other = 1.0 - (exec + fetch.getOrElse(0d))
......
...@@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker { ...@@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker {
def numLocalBlocks: Int def numLocalBlocks: Int
def numRemoteBlocks: Int def numRemoteBlocks: Int
def remoteFetchTime : Long def remoteFetchTime : Long
def remoteFetchWaitTime: Long def fetchWaitTime: Long
def remoteBytesRead : Long def remoteBytesRead : Long
} }
...@@ -903,7 +903,7 @@ class BlockFetcherIterator( ...@@ -903,7 +903,7 @@ class BlockFetcherIterator(
private var _remoteBytesRead = 0l private var _remoteBytesRead = 0l
private var _remoteFetchTime = 0l private var _remoteFetchTime = 0l
private var _remoteFetchWaitTime = 0l private var _fetchWaitTime = 0l
if (blocksByAddress == null) { if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null") throw new IllegalArgumentException("BlocksByAddress is null")
...@@ -1046,7 +1046,7 @@ class BlockFetcherIterator( ...@@ -1046,7 +1046,7 @@ class BlockFetcherIterator(
val startFetchWait = System.currentTimeMillis() val startFetchWait = System.currentTimeMillis()
val result = results.take() val result = results.take()
val stopFetchWait = System.currentTimeMillis() val stopFetchWait = System.currentTimeMillis()
_remoteFetchWaitTime += (stopFetchWait - startFetchWait) _fetchWaitTime += (stopFetchWait - startFetchWait)
bytesInFlight -= result.size bytesInFlight -= result.size
while (!fetchRequests.isEmpty && while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
...@@ -1061,7 +1061,7 @@ class BlockFetcherIterator( ...@@ -1061,7 +1061,7 @@ class BlockFetcherIterator(
def numRemoteBlocks = remoteBlockIds.size def numRemoteBlocks = remoteBlockIds.size
def remoteFetchTime = _remoteFetchTime def remoteFetchTime = _remoteFetchTime
def remoteFetchWaitTime = _remoteFetchWaitTime def fetchWaitTime = _fetchWaitTime
def remoteBytesRead = _remoteBytesRead def remoteBytesRead = _remoteBytesRead
......
...@@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { ...@@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
def numLocalBlocks = delegate.numLocalBlocks def numLocalBlocks = delegate.numLocalBlocks
def numRemoteBlocks = delegate.numRemoteBlocks def numRemoteBlocks = delegate.numRemoteBlocks
def remoteFetchTime = delegate.remoteFetchTime def remoteFetchTime = delegate.remoteFetchTime
def remoteFetchWaitTime = delegate.remoteFetchWaitTime def fetchWaitTime = delegate.fetchWaitTime
def remoteBytesRead = delegate.remoteBytesRead def remoteBytesRead = delegate.remoteBytesRead
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment