Skip to content
Snippets Groups Projects
Commit cdbfd1e1 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #516 from squito/fix_local_metrics

Fix local metrics
parents f9fa2add 8a11ac3d
No related branches found
No related tags found
No related merge requests found
...@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin ...@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val shuffleMetrics = new ShuffleReadMetrics val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
shuffleMetrics.remoteFetchWaitTime = itr.remoteFetchWaitTime shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
shuffleMetrics.remoteBytesRead = itr.remoteBytesRead shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = itr.totalBlocks shuffleMetrics.totalBlocksFetched = itr.totalBlocks
shuffleMetrics.localBlocksFetched = itr.numLocalBlocks shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
......
...@@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable { ...@@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable {
var shuffleReadMillis: Long = _ var shuffleReadMillis: Long = _
/** /**
* Total time that is spent blocked waiting for shuffle to fetch remote data * Total time that is spent blocked waiting for shuffle to fetch data
*/ */
var remoteFetchWaitTime: Long = _ var fetchWaitTime: Long = _
/** /**
* The total amount of time for all the shuffle fetches. This adds up time from overlapping * The total amount of time for all the shuffle fetches. This adds up time from overlapping
......
...@@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging { ...@@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging {
showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten}) showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
//fetch & io //fetch & io
showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.remoteFetchWaitTime}) showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead}) showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize)) showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
...@@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe ...@@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe
object RuntimePercentage { object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = { def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.remoteFetchWaitTime} val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
val fetch = fetchTime.map{_ / denom} val fetch = fetchTime.map{_ / denom}
val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d)) val other = 1.0 - (exec + fetch.getOrElse(0d))
......
...@@ -67,8 +67,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon ...@@ -67,8 +67,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes") logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes")
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes) val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile updateDependencies(taskFiles, taskJars) // Download any files added with addFile
val deserStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]]( val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader) taskBytes, Thread.currentThread.getContextClassLoader)
val deserTime = System.currentTimeMillis() - deserStart
// Run it // Run it
val result: Any = deserializedTask.run(attemptId) val result: Any = deserializedTask.run(attemptId)
...@@ -77,15 +79,19 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon ...@@ -77,15 +79,19 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
// executor does. This is useful to catch serialization errors early // executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs // on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors). // to the cluster, they don't get surprised by serialization errors).
val resultToReturn = ser.deserialize[Any](ser.serialize(result)) val serResult = ser.serialize(result)
deserializedTask.metrics.get.resultSize = serResult.limit()
val resultToReturn = ser.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]]( val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values)) ser.serialize(Accumulators.values))
logInfo("Finished " + task) logInfo("Finished " + task)
info.markSuccessful() info.markSuccessful()
deserializedTask.metrics.get.executorRunTime = info.duration.toInt //close enough
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
// If the threadpool has not already been shutdown, notify DAGScheduler // If the threadpool has not already been shutdown, notify DAGScheduler
if (!Thread.currentThread().isInterrupted) if (!Thread.currentThread().isInterrupted)
listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, null) listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, deserializedTask.metrics.getOrElse(null))
} catch { } catch {
case t: Throwable => { case t: Throwable => {
logError("Exception in task " + idInJob, t) logError("Exception in task " + idInJob, t)
......
...@@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker { ...@@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker {
def numLocalBlocks: Int def numLocalBlocks: Int
def numRemoteBlocks: Int def numRemoteBlocks: Int
def remoteFetchTime : Long def remoteFetchTime : Long
def remoteFetchWaitTime: Long def fetchWaitTime: Long
def remoteBytesRead : Long def remoteBytesRead : Long
} }
...@@ -903,7 +903,7 @@ class BlockFetcherIterator( ...@@ -903,7 +903,7 @@ class BlockFetcherIterator(
private var _remoteBytesRead = 0l private var _remoteBytesRead = 0l
private var _remoteFetchTime = 0l private var _remoteFetchTime = 0l
private var _remoteFetchWaitTime = 0l private var _fetchWaitTime = 0l
if (blocksByAddress == null) { if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null") throw new IllegalArgumentException("BlocksByAddress is null")
...@@ -1046,7 +1046,7 @@ class BlockFetcherIterator( ...@@ -1046,7 +1046,7 @@ class BlockFetcherIterator(
val startFetchWait = System.currentTimeMillis() val startFetchWait = System.currentTimeMillis()
val result = results.take() val result = results.take()
val stopFetchWait = System.currentTimeMillis() val stopFetchWait = System.currentTimeMillis()
_remoteFetchWaitTime += (stopFetchWait - startFetchWait) _fetchWaitTime += (stopFetchWait - startFetchWait)
bytesInFlight -= result.size bytesInFlight -= result.size
while (!fetchRequests.isEmpty && while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
...@@ -1061,7 +1061,7 @@ class BlockFetcherIterator( ...@@ -1061,7 +1061,7 @@ class BlockFetcherIterator(
def numRemoteBlocks = remoteBlockIds.size def numRemoteBlocks = remoteBlockIds.size
def remoteFetchTime = _remoteFetchTime def remoteFetchTime = _remoteFetchTime
def remoteFetchWaitTime = _remoteFetchWaitTime def fetchWaitTime = _fetchWaitTime
def remoteBytesRead = _remoteBytesRead def remoteBytesRead = _remoteBytesRead
......
...@@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker { ...@@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
def numLocalBlocks = delegate.numLocalBlocks def numLocalBlocks = delegate.numLocalBlocks
def numRemoteBlocks = delegate.numRemoteBlocks def numRemoteBlocks = delegate.numRemoteBlocks
def remoteFetchTime = delegate.remoteFetchTime def remoteFetchTime = delegate.remoteFetchTime
def remoteFetchWaitTime = delegate.remoteFetchWaitTime def fetchWaitTime = delegate.fetchWaitTime
def remoteBytesRead = delegate.remoteBytesRead def remoteBytesRead = delegate.remoteBytesRead
} }
package spark.scheduler
import org.scalatest.FunSuite
import spark.{SparkContext, LocalSparkContext}
import scala.collection.mutable
import org.scalatest.matchers.ShouldMatchers
import spark.SparkContext._
/**
*
*/
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
//just to make sure some of the tasks take a noticeable amount of time
val w = {i:Int =>
if (i == 0)
Thread.sleep(100)
i
}
val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count
listener.stageInfos.size should be (1)
val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2")
val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)}
d4.setName("A Cogroup")
d4.collectAsMap
listener.stageInfos.size should be (4)
listener.stageInfos.foreach {stageInfo =>
//small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime")
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime")
if (stageInfo.stage.rdd.name == d4.name) {
checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
}
stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
if (stageInfo.stage.rdd.name == d4.name) {
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0)
sm.shuffleReadMillis should be > (0l)
sm.localBlocksFetched should be > (0)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
sm.remoteFetchTime should be (0l)
}
}
}
}
def checkNonZeroAvg(m: Traversable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}
def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = {
val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name}
!names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty
}
class SaveStageInfo extends SparkListener {
val stageInfos = mutable.Buffer[StageInfo]()
def onStageCompleted(stage: StageCompleted) {
stageInfos += stage.stageInfo
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment