diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 5fd90d0d888d942708be6816a4b298d008024e1f..cda3a95c84f22ca0af7ab10268ff01959213d2ca 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,7 +34,8 @@ case class Aggregator[K, V, C] ( private val sparkConf = SparkEnv.get.conf private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true) - def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = { + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], + context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -52,7 +53,8 @@ case class Aggregator[K, V, C] ( val (k, v) = iter.next() combiners.insert(k, v) } - context.taskMetrics.bytesSpilled = combiners.bytesSpilled + context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled combiners.iterator } } @@ -75,7 +77,8 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } - context.taskMetrics.bytesSpilled = combiners.bytesSpilled + context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 44a15549cb09bf91586f0f228de6616380654dbf..0c8f4662a5f3a945bb1ea93b4870e42d8c9dcf3e 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -49,9 +49,14 @@ class TaskMetrics extends Serializable { var resultSerializationTime: Long = _ /** - * The number of bytes spilled to disk by this task + * The number of in-memory bytes spilled by this task */ - var bytesSpilled: Long = _ + var memoryBytesSpilled: Long = _ + + /** + * The number of on-disk bytes spilled by this task + */ + var diskBytesSpilled: Long = _ /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 656c3efa9543a536205cfc67e7ca553eddca746a..9c6b308804c77da8f48b3bbc9241b491079d6c46 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -151,7 +151,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: map.insert(kv._1, new CoGroupValue(kv._2, depNum)) } } - context.taskMetrics.bytesSpilled = map.bytesSpilled + context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled + context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled new InterruptibleIterator(context, map.iterator) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 3f9cc97252cc39c8bd9f4d7ae09b3575f491f026..64e22a30b48f9380c47932d82e04d877d5f554f4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -24,5 +24,6 @@ private[spark] class ExecutorSummary { var succeededTasks : Int = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 - var bytesSpilled : Long = 0 + var memoryBytesSpilled : Long = 0 + var diskBytesSpilled : Long = 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 2522fba8e3f589ded7363459c8d227d1495cfabc..7b7325322a242449bf3908fdc24123b16105447d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -48,7 +48,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) <th>Succeeded Tasks</th> <th>Shuffle Read</th> <th>Shuffle Write</th> - <th>Bytes Spilled</th> + <th>Bytes Spilled (Memory)</th> + <th>Bytes Spilled (Disk)</th> </thead> <tbody> {createExecutorTable()} @@ -81,7 +82,8 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int) <td>{v.succeededTasks}</td> <td>{Utils.bytesToString(v.shuffleRead)}</td> <td>{Utils.bytesToString(v.shuffleWrite)}</td> - <td>{Utils.bytesToString(v.bytesSpilled)}</td> + <td>{Utils.bytesToString(v.memoryBytesSpilled)}</td> + <td>{Utils.bytesToString(v.diskBytesSpilled)}</td> </tr> } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f2c8658f981d3eafb02ef1f4ac38497b34173eb4..858a10ce750fffb8136637edbb512d4ecc7491c6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -52,7 +52,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val stageIdToTime = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() - val stageIdToBytesSpilled = HashMap[Int, Long]() + val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() + val stageIdToDiskBytesSpilled = HashMap[Int, Long]() val stageIdToTasksActive = HashMap[Int, HashSet[TaskInfo]]() val stageIdToTasksComplete = HashMap[Int, Int]() val stageIdToTasksFailed = HashMap[Int, Int]() @@ -79,7 +80,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToTime.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) - stageIdToBytesSpilled.remove(s.stageId) + stageIdToMemoryBytesSpilled.remove(s.stageId) + stageIdToDiskBytesSpilled.remove(s.stageId) stageIdToTasksActive.remove(s.stageId) stageIdToTasksComplete.remove(s.stageId) stageIdToTasksFailed.remove(s.stageId) @@ -151,7 +153,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList Option(taskEnd.taskMetrics).foreach { taskMetrics => taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } - y.bytesSpilled += taskMetrics.bytesSpilled + y.memoryBytesSpilled += taskMetrics.memoryBytesSpilled + y.diskBytesSpilled += taskMetrics.diskBytesSpilled } } case _ => {} @@ -187,9 +190,13 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageIdToShuffleWrite(sid) += shuffleWrite totalShuffleWrite += shuffleWrite - stageIdToBytesSpilled.getOrElseUpdate(sid, 0L) - val bytesSpilled = metrics.map(m => m.bytesSpilled).getOrElse(0L) - stageIdToBytesSpilled(sid) += bytesSpilled + stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) + val memoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled).getOrElse(0L) + stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled + + stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) + val diskBytesSpilled = metrics.map(m => m.diskBytesSpilled).getOrElse(0L) + stageIdToDiskBytesSpilled(sid) += diskBytesSpilled val taskList = stageIdToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index bb104318b06bf52e090c2f8762150c94dca8df23..8f89fad2fd75b5c328b79e0af821da40013efce7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,8 +56,9 @@ private[spark] class StagePage(parent: JobProgressUI) { val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) val hasShuffleWrite = shuffleWriteBytes > 0 - val bytesSpilled = listener.stageIdToBytesSpilled.getOrElse(stageId, 0L) - val hasBytesSpilled = bytesSpilled > 0 + val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) + val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) + val hasBytesSpilled = (memoryBytesSpilled > 0 && diskBytesSpilled > 0) var activeTime = 0L listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) @@ -85,8 +86,12 @@ private[spark] class StagePage(parent: JobProgressUI) { } {if (hasBytesSpilled) <li> - <strong>Bytes spilled: </strong> - {Utils.bytesToString(bytesSpilled)} + <strong>Bytes spilled (memory): </strong> + {Utils.bytesToString(memoryBytesSpilled)} + </li> + <li> + <strong>Bytes spilled (disk): </strong> + {Utils.bytesToString(diskBytesSpilled)} </li> } </ul> @@ -97,7 +102,7 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Duration", "GC Time", "Result Ser Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ - {if (hasBytesSpilled) Seq("Bytes Spilled") else Nil} ++ + {if (hasBytesSpilled) Seq("Bytes Spilled (Memory)", "Bytes Spilled (Disk)") else Nil} ++ Seq("Errors") val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) @@ -162,11 +167,19 @@ private[spark] class StagePage(parent: JobProgressUI) { } val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) - val bytesSpilledSizes = validTasks.map { + val memoryBytesSpilledSizes = validTasks.map { case(info, metrics, exception) => - metrics.get.bytesSpilled.toDouble + metrics.get.memoryBytesSpilled.toDouble } - val bytesSpilledQuantiles = "Bytes Spilled" +: getQuantileCols(bytesSpilledSizes) + val memoryBytesSpilledQuantiles = "Bytes spilled (memory)" +: + getQuantileCols(memoryBytesSpilledSizes) + + val diskBytesSpilledSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.diskBytesSpilled.toDouble + } + val diskBytesSpilledQuantiles = "Bytes spilled (disk)" +: + getQuantileCols(diskBytesSpilledSizes) val listings: Seq[Seq[String]] = Seq( serializationQuantiles, @@ -175,7 +188,8 @@ private[spark] class StagePage(parent: JobProgressUI) { schedulerDelayQuantiles, if (hasShuffleRead) shuffleReadQuantiles else Nil, if (hasShuffleWrite) shuffleWriteQuantiles else Nil, - if (hasBytesSpilled) bytesSpilledQuantiles else Nil) + if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil, + if (hasBytesSpilled) diskBytesSpilledQuantiles else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -220,9 +234,13 @@ private[spark] class StagePage(parent: JobProgressUI) { val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms => if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("") - val maybeBytesSpilled = metrics.map{m => m.bytesSpilled} - val bytesSpilledSortable = maybeBytesSpilled.map(_.toString).getOrElse("") - val bytesSpilledReadable = maybeBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled} + val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") + val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") + + val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled} + val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") + val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("") <tr> <td>{info.index}</td> @@ -254,8 +272,11 @@ private[spark] class StagePage(parent: JobProgressUI) { </td> }} {if (bytesSpilled) { - <td sorttable_customkey={bytesSpilledSortable}> - {bytesSpilledReadable} + <td sorttable_customkey={memoryBytesSpilledSortable}> + {memoryBytesSpilledReadable} + </td> + <td sorttable_customkey={diskBytesSpilledSortable}> + {diskBytesSpilledReadable} </td> }} <td>{exception.map(e => diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index f4e53c4d3e117caef24a42615f128bacad1367a6..c63f47cc45f519020922b87d05ffd8ccd7e48e93 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -86,7 +86,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( private var spillCount = 0 // Number of bytes spilled in total - private var _bytesSpilled = 0L + private var _memoryBytesSpilled = 0L + private var _diskBytesSpilled = 0L private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false) @@ -153,6 +154,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( writer.commit() } finally { // Partial failures cannot be tolerated; do not revert partial writes + _diskBytesSpilled += writer.bytesWritten writer.close() } currentMap = new SizeTrackingAppendOnlyMap[K, C] @@ -164,13 +166,11 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( shuffleMemoryMap(Thread.currentThread().getId) = 0 } numPairsInMemory = 0 - _bytesSpilled += mapSize + _memoryBytesSpilled += mapSize } - /** - * Register the total number of bytes spilled by this task - */ - def bytesSpilled: Long = _bytesSpilled + def memoryBytesSpilled: Long = _memoryBytesSpilled + def diskBytesSpilled: Long = _diskBytesSpilled /** * Return an iterator that merges the in-memory map with the spilled maps.