From 7b71a0e09622e09285a9884ebb67b5fb1c5caa53 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout <kayousterhout@gmail.com> Date: Sun, 29 Jun 2014 22:01:42 -0700 Subject: [PATCH] [SPARK-1683] Track task read metrics. This commit adds a new metric in TaskMetrics to record the input data size and displays this information in the UI. An earlier version of this commit also added the read time, which can be useful for diagnosing straggler problems, but unfortunately that change introduced a significant performance regression for jobs that don't do much computation. In order to track read time, we'll need to do sampling. The screenshots below show the UI with the new "Input" field, which I added to the stage summary page, the executor summary page, and the per-stage page.    Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #962 from kayousterhout/read_metrics and squashes the following commits: f13b67d [Kay Ousterhout] Correctly format input bytes on executor page 8b70cde [Kay Ousterhout] Added comment about potential inaccuracy of bytesRead d1016e8 [Kay Ousterhout] Udated SparkListenerSuite test 8461492 [Kay Ousterhout] Miniscule style fix ae04d99 [Kay Ousterhout] Remove input metrics for parallel collections 719f19d [Kay Ousterhout] Style fixes bb6ec62 [Kay Ousterhout] Small fixes 869ac7b [Kay Ousterhout] Updated Json tests 44a0301 [Kay Ousterhout] Fixed accidentally added line 4bd0568 [Kay Ousterhout] Added input source, renamed Hdfs to Hadoop. f27e535 [Kay Ousterhout] Updates based on review comments and to fix rebase bf41029 [Kay Ousterhout] Updated Json tests to pass 0fc33e0 [Kay Ousterhout] Added explicit backward compatibility test 4e52925 [Kay Ousterhout] Added Json output and associated tests. 365400b [Kay Ousterhout] [SPARK-1683] Track task read metrics. --- .../scala/org/apache/spark/CacheManager.scala | 10 +- .../apache/spark/executor/TaskMetrics.scala | 29 +++++ .../scala/org/apache/spark/rdd/BlockRDD.scala | 2 +- .../org/apache/spark/rdd/HadoopRDD.scala | 15 +++ .../org/apache/spark/rdd/NewHadoopRDD.scala | 13 +++ .../apache/spark/scheduler/JobLogger.scala | 15 ++- .../apache/spark/storage/BlockManager.scala | 63 ++++++----- .../apache/spark/storage/ThreadingTest.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 4 + .../apache/spark/ui/exec/ExecutorsTab.scala | 5 + .../spark/ui/jobs/ExecutorSummary.scala | 1 + .../apache/spark/ui/jobs/ExecutorTable.scala | 2 + .../spark/ui/jobs/JobProgressListener.scala | 15 ++- .../org/apache/spark/ui/jobs/StagePage.scala | 41 +++++-- .../org/apache/spark/ui/jobs/StageTable.scala | 7 ++ .../org/apache/spark/util/JsonProtocol.scala | 20 +++- .../org/apache/spark/CacheManagerSuite.scala | 4 +- .../spark/scheduler/SparkListenerSuite.scala | 1 + .../spark/storage/BlockManagerSuite.scala | 84 +++++++++++---- .../apache/spark/util/JsonProtocolSuite.scala | 102 +++++++++++++++--- 20 files changed, 349 insertions(+), 86 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 3f667a4a0f..8f867686a0 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -19,6 +19,7 @@ package org.apache.spark import scala.collection.mutable.{ArrayBuffer, HashSet} +import org.apache.spark.executor.InputMetrics import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") blockManager.get(key) match { - case Some(values) => + case Some(blockResult) => // Partition is already materialized, so just return its values - new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) + context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => // Acquire a lock for loading this partition @@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Whoever was loading $id failed; we'll try it ourselves") loading.add(id) } - values.map(_.asInstanceOf[Iterator[T]]) + values.map(_.data.asInstanceOf[Iterator[T]]) } } } @@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * exceptions that can be avoided. */ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) blockManager.get(key) match { - case Some(v) => v.asInstanceOf[Iterator[T]] + case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => logInfo(s"Failure to store $key") throw new BlockException(key, s"Block manager failed to return cached value for $key!") diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 350fd74173..ac73288442 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -66,6 +66,12 @@ class TaskMetrics extends Serializable { */ var diskBytesSpilled: Long = _ + /** + * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read + * are stored here. + */ + var inputMetrics: Option[InputMetrics] = None + /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ @@ -87,6 +93,29 @@ private[spark] object TaskMetrics { def empty: TaskMetrics = new TaskMetrics } +/** + * :: DeveloperApi :: + * Method by which input data was read. Network means that the data was read over the network + * from a remote block manager (which may have stored the data on-disk or in-memory). + */ +@DeveloperApi +object DataReadMethod extends Enumeration with Serializable { + type DataReadMethod = Value + val Memory, Disk, Hadoop, Network = Value +} + +/** + * :: DeveloperApi :: + * Metrics about reading input data. + */ +@DeveloperApi +case class InputMetrics(readMethod: DataReadMethod.Value) { + /** + * Total bytes read. + */ + var bytesRead: Long = 0L +} + /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index c64da8804d..2673ec2250 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { - case Some(block) => block.asInstanceOf[Iterator[T]] + case Some(block) => block.data.asInstanceOf[Iterator[T]] case None => throw new Exception("Could not compute split, block " + blockId + " not found") } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2aa111d600..98dcbf4e2d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -38,6 +38,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.util.NextIterator /** @@ -196,6 +197,20 @@ class HadoopRDD[K, V]( context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue() + + // Set the task input metrics. + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + try { + /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't + * always at record boundaries, so tasks may need to read into other splits to complete + * a record. */ + inputMetrics.bytesRead = split.inputSplit.value.getLength() + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + context.taskMetrics.inputMetrics = Some(inputMetrics) + override def getNext() = { try { finished = !reader.next(key, value) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index ac1ccc06f2..f2b3a64bf1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.Logging import org.apache.spark.Partition import org.apache.spark.SerializableWritable import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.executor.{DataReadMethod, InputMetrics} private[spark] class NewHadoopPartition( rddId: Int, @@ -112,6 +113,18 @@ class NewHadoopRDD[K, V]( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + try { + /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't + * always at record boundaries, so tasks may need to read into other splits to complete + * a record. */ + inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength() + } catch { + case e: Exception => + logWarning("Unable to get input split size in order to set task input bytes", e) + } + context.taskMetrics.inputMetrics = Some(inputMetrics) + // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) var havePair = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index a1e21cad48..47dd112f68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * :: DeveloperApi :: @@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = taskMetrics.shuffleReadMetrics match { + val inputMetrics = taskMetrics.inputMetrics match { + case Some(metrics) => + " READ_METHOD=" + metrics.readMethod.toString + + " INPUT_BYTES=" + metrics.bytesRead + case None => "" + } + val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match { case Some(metrics) => " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + @@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten case None => "" } - stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics) + stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics + + writeMetrics) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d2f7baf928..0db0a5bc73 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props} import sun.nio.ch.DirectBuffer import org.apache.spark._ +import org.apache.spark.executor.{DataReadMethod, InputMetrics} import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer @@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues +/* Class for returning a fetched block and associated metrics. */ +private[spark] class BlockResult( + val data: Iterator[Any], + readMethod: DataReadMethod.Value, + bytes: Long) { + val inputMetrics = new InputMetrics(readMethod) + inputMetrics.bytesRead = bytes +} + private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, @@ -334,9 +344,9 @@ private[spark] class BlockManager( /** * Get block from local block manager. */ - def getLocal(blockId: BlockId): Option[Iterator[Any]] = { + def getLocal(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting local block $blockId") - doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] } /** @@ -355,11 +365,11 @@ private[spark] class BlockManager( blockId, s"Block $blockId not found on disk, though it should be") } } else { - doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } } - private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = { + private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { @@ -386,14 +396,14 @@ private[spark] class BlockManager( // Look for the block in memory if (level.useMemory) { logDebug(s"Getting block $blockId from memory") - val result = if (asValues) { - memoryStore.getValues(blockId) + val result = if (asBlockResult) { + memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size)) } else { memoryStore.getBytes(blockId) } result match { case Some(values) => - return Some(values) + return result case None => logDebug(s"Block $blockId not found in memory") } @@ -405,10 +415,11 @@ private[spark] class BlockManager( if (tachyonStore.contains(blockId)) { tachyonStore.getBytes(blockId) match { case Some(bytes) => - if (!asValues) { + if (!asBlockResult) { return Some(bytes) } else { - return Some(dataDeserialize(blockId, bytes)) + return Some(new BlockResult( + dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size)) } case None => logDebug(s"Block $blockId not found in tachyon") @@ -429,14 +440,15 @@ private[spark] class BlockManager( if (!level.useMemory) { // If the block shouldn't be stored in memory, we can just return it - if (asValues) { - return Some(dataDeserialize(blockId, bytes)) + if (asBlockResult) { + return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, + info.size)) } else { return Some(bytes) } } else { // Otherwise, we also have to store something in the memory store - if (!level.deserialized || !asValues) { + if (!level.deserialized || !asBlockResult) { /* We'll store the bytes in memory if the block's storage level includes * "memory serialized", or if it should be cached as objects in memory * but we only requested its serialized bytes. */ @@ -445,7 +457,7 @@ private[spark] class BlockManager( memoryStore.putBytes(blockId, copyForMemory, level) bytes.rewind() } - if (!asValues) { + if (!asBlockResult) { return Some(bytes) } else { val values = dataDeserialize(blockId, bytes) @@ -457,12 +469,12 @@ private[spark] class BlockManager( memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data match { case Left(values2) => - return Some(values2) + return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) case _ => - throw new SparkException("Memory store did not return an iterator") + throw new SparkException("Memory store did not return back an iterator") } } else { - return Some(values) + return Some(new BlockResult(values, DataReadMethod.Disk, info.size)) } } } @@ -477,9 +489,9 @@ private[spark] class BlockManager( /** * Get block from remote block managers. */ - def getRemote(blockId: BlockId): Option[Iterator[Any]] = { + def getRemote(blockId: BlockId): Option[BlockResult] = { logDebug(s"Getting remote block $blockId") - doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]] + doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]] } /** @@ -487,10 +499,10 @@ private[spark] class BlockManager( */ def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = { logDebug(s"Getting remote block $blockId as bytes") - doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]] + doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } - private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = { + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") val locations = Random.shuffle(master.getLocations(blockId)) for (loc <- locations) { @@ -498,8 +510,11 @@ private[spark] class BlockManager( val data = BlockManagerWorker.syncGetBlock( GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { - if (asValues) { - return Some(dataDeserialize(blockId, data)) + if (asBlockResult) { + return Some(new BlockResult( + dataDeserialize(blockId, data), + DataReadMethod.Network, + data.limit())) } else { return Some(data) } @@ -513,7 +528,7 @@ private[spark] class BlockManager( /** * Get a block from the block manager (either local or remote). */ - def get(blockId: BlockId): Option[Iterator[Any]] = { + def get(blockId: BlockId): Option[BlockResult] = { val local = getLocal(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") @@ -792,7 +807,7 @@ private[spark] class BlockManager( * Read a block consisting of a single object. */ def getSingle(blockId: BlockId): Option[Any] = { - get(blockId).map(_.next()) + get(blockId).map(_.data.next()) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index a107c5182b..328be158db 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -78,7 +78,7 @@ private[spark] object ThreadingTest { val startTime = System.currentTimeMillis() manager.get(blockId) match { case Some(retrievedBlock) => - assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, + assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match") println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms") diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 6cfc46c7e7..9625337ae2 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -72,6 +72,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { "Complete Tasks", "Total Tasks", "Task Time", + "Input Bytes", "Shuffle Read", "Shuffle Write") @@ -97,6 +98,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { <td>{values("Complete Tasks")}</td> <td>{values("Total Tasks")}</td> <td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td> + <td sorttable_customkey={values("Input Bytes")}>{Utils.bytesToString(values("Input Bytes").toLong)}</td> <td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td> <td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td> </tr> @@ -119,6 +121,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks val totalDuration = listener.executorToDuration.getOrElse(execId, 0) + val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) @@ -136,6 +139,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { completedTasks, totalTasks, totalDuration, + totalInputBytes, totalShuffleRead, totalShuffleWrite, maxMem diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 91d37b835b..58eeb86bf9 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() + val executorToInputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() @@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) // Update shuffle read/write val metrics = taskEnd.taskMetrics if (metrics != null) { + metrics.inputMetrics.foreach { inputMetrics => + executorToInputBytes(eid) = + executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala index 2aaf6329b7..c4a8996c0b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala @@ -28,6 +28,7 @@ class ExecutorSummary { var taskTime : Long = 0 var failedTasks : Int = 0 var succeededTasks : Int = 0 + var inputBytes: Long = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 var memoryBytesSpilled : Long = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index add0e9878a..2a34a9af92 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { <th>Total Tasks</th> <th>Failed Tasks</th> <th>Succeeded Tasks</th> + <th>Input Bytes</th> <th>Shuffle Read</th> <th>Shuffle Write</th> <th>Shuffle Spill (Memory)</th> @@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { <td>{v.failedTasks + v.succeededTasks}</td> <td>{v.failedTasks}</td> <td>{v.succeededTasks}</td> + <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td> <td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td> <td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td> <td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 381a5443df..2286a7f952 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - // Total metrics reflect metrics only for completed tasks - var totalTime = 0L - var totalShuffleRead = 0L - var totalShuffleWrite = 0L - // TODO: Should probably consolidate all following into a single hash map. val stageIdToTime = HashMap[Int, Long]() + val stageIdToInputBytes = HashMap[Int, Long]() val stageIdToShuffleRead = HashMap[Int, Long]() val stageIdToShuffleWrite = HashMap[Int, Long]() val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() @@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val toRemove = math.max(retainedStages / 10, 1) stages.take(toRemove).foreach { s => stageIdToTime.remove(s.stageId) + stageIdToInputBytes.remove(s.stageId) stageIdToShuffleRead.remove(s.stageId) stageIdToShuffleWrite.remove(s.stageId) stageIdToMemoryBytesSpilled.remove(s.stageId) @@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val metrics = taskEnd.taskMetrics if (metrics != null) { + metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead } metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } y.memoryBytesSpilled += metrics.memoryBytesSpilled @@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { stageIdToTime.getOrElseUpdate(sid, 0L) val time = metrics.map(_.executorRunTime).getOrElse(0L) stageIdToTime(sid) += time - totalTime += time + + stageIdToInputBytes.getOrElseUpdate(sid, 0L) + val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L) + stageIdToInputBytes(sid) += inputBytes stageIdToShuffleRead.getOrElseUpdate(sid, 0L) val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) stageIdToShuffleRead(sid) += shuffleRead - totalShuffleRead += shuffleRead stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) val shuffleWrite = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) stageIdToShuffleWrite(sid) += shuffleWrite - totalShuffleWrite += shuffleWrite stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8e3d5d1cd4..afb8ed754f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) + val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L) + val hasInput = inputBytes > 0 val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) val hasShuffleRead = shuffleReadBytes > 0 val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) @@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { <strong>Total task time across all tasks: </strong> {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} </li> + {if (hasInput) + <li> + <strong>Input: </strong> + {Utils.bytesToString(inputBytes)} + </li> + } {if (hasShuffleRead) <li> <strong>Shuffle read: </strong> @@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { Seq( "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", "Launch Time", "Duration", "GC Time") ++ + {if (hasInput) Seq("Input") else Nil} ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ Seq("Errors") val taskTable = UIUtils.listingTable( - taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) + taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks) // Excludes tasks which failed and have incomplete metrics val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined) @@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def getQuantileCols(data: Seq[Double]) = Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) + val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble + } + val inputQuantiles = "Input" +: getQuantileCols(inputSizes) + val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } @@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { serviceQuantiles, gettingResultQuantiles, schedulerDelayQuantiles, + if (hasInput) inputQuantiles else Nil, if (hasShuffleRead) shuffleReadQuantiles else Nil, if (hasShuffleWrite) shuffleWriteQuantiles else Nil, if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil, @@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } } - def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean) - (taskData: TaskUIData): Seq[Node] = { + def taskRow( + hasInput: Boolean, + hasShuffleRead: Boolean, + hasShuffleWrite: Boolean, + hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { taskData match { case TaskUIData(info, metrics, errorMessage) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) @@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) + val maybeInput = metrics.flatMap(_.inputMetrics) + val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") + val inputReadable = maybeInput + .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") + .getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") @@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} </td> --> - {if (shuffleRead) { + {if (hasInput) { + <td sorttable_customkey={inputSortable}> + {inputReadable} + </td> + }} + {if (hasShuffleRead) { <td sorttable_customkey={shuffleReadSortable}> {shuffleReadReadable} </td> }} - {if (shuffleWrite) { + {if (hasShuffleWrite) { <td sorttable_customkey={writeTimeSortable}> {writeTimeReadable} </td> @@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { {shuffleWriteReadable} </td> }} - {if (bytesSpilled) { + {if (hasBytesSpilled) { <td sorttable_customkey={memoryBytesSpilledSortable}> {memoryBytesSpilledReadable} </td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index b2b6cc6a6e..a9ac6d5bee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -43,6 +43,7 @@ private[ui] class StageTableBase( <th>Submitted</th> <th>Duration</th> <th>Tasks: Succeeded/Total</th> + <th>Input</th> <th>Shuffle Read</th> <th>Shuffle Write</th> } @@ -123,6 +124,11 @@ private[ui] class StageTableBase( case _ => "" } val totalTasks = s.numTasks + val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L) + val inputRead = inputSortable match { + case 0 => "" + case b => Utils.bytesToString(b) + } val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) val shuffleRead = shuffleReadSortable match { case 0 => "" @@ -150,6 +156,7 @@ private[ui] class StageTableBase( <td class="progress-cell"> {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} </td> + <td sorttable_customekey={inputSortable.toString}>{inputRead}</td> <td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td> <td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td> } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6245b4b802..26c9c9d603 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -26,7 +26,8 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ -import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics, + ShuffleWriteMetrics, TaskMetrics} import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ @@ -213,6 +214,8 @@ private[spark] object JsonProtocol { taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing) val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) + val inputMetrics = + taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing) val updatedBlocks = taskMetrics.updatedBlocks.map { blocks => JArray(blocks.toList.map { case (id, status) => @@ -230,6 +233,7 @@ private[spark] object JsonProtocol { ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ + ("Input Metrics" -> inputMetrics) ~ ("Updated Blocks" -> updatedBlocks) } @@ -247,6 +251,11 @@ private[spark] object JsonProtocol { ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) } + def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { + ("Data Read Method" -> inputMetrics.readMethod.toString) ~ + ("Bytes Read" -> inputMetrics.bytesRead) + } + def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { val reason = Utils.getFormattedClassName(taskEndReason) val json = taskEndReason match { @@ -528,6 +537,8 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson) metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) + metrics.inputMetrics = + Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson) metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { value => value.extract[List[JValue]].map { block => @@ -557,6 +568,13 @@ private[spark] object JsonProtocol { metrics } + def inputMetricsFromJson(json: JValue): InputMetrics = { + val metrics = new InputMetrics( + DataReadMethod.withName((json \ "Data Read Method").extract[String])) + metrics.bytesRead = (json \ "Bytes Read").extract[Long] + metrics + } + def taskEndReasonFromJson(json: JValue): TaskEndReason = { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 4f178db40f..7f5d0b061e 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get cached rdd") { expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12) + blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result)) } whenExecuting(blockManager) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6df0a08096..71f48e295e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { + taskMetrics.inputMetrics should not be ('defined) taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7dbe5164b..23cb6905bf 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.Matchers import org.scalatest.time.SpanSugar._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.executor.DataReadMethod import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.language.postfixOps @@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } } + test("correct BlockResult returned from get() calls") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr, + mapOutputTracker) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list1ForSizeEstimate = new ArrayBuffer[Any] + list1ForSizeEstimate ++= list1.iterator + val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate) + val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150)) + val list2ForSizeEstimate = new ArrayBuffer[Any] + list2ForSizeEstimate ++= list2.iterator + val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val list1Get = store.get("list1") + assert(list1Get.isDefined, "list1 expected to be in store") + assert(list1Get.get.data.size === 2) + assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate) + assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2MemoryGet = store.get("list2memory") + assert(list2MemoryGet.isDefined, "list2memory expected to be in store") + assert(list2MemoryGet.get.data.size === 3) + assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate) + assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2DiskGet = store.get("list2disk") + assert(list2DiskGet.isDefined, "list2memory expected to be in store") + assert(list2DiskGet.get.data.size === 3) + System.out.println(list2DiskGet) + // We don't know the exact size of the data on disk, but it should certainly be > 0. + assert(list2DiskGet.get.inputMetrics.bytesRead > 0) + assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) + } + test("in-memory LRU storage") { store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr, mapOutputTracker) @@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3").isDefined, "list3 was not in store") - assert(store.get("list3").get.size == 2) + assert(store.get("list3").get.data.size === 2) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list1").isDefined, "list1 was not in store") - assert(store.get("list1").get.size == 2) + assert(store.get("list1").get.data.size === 2) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3") === None, "list1 was in store") } @@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val listForSizeEstimate = new ArrayBuffer[Any] + listForSizeEstimate ++= list1.iterator + val listSize = SizeEstimator.estimate(listForSizeEstimate) // At this point LRU should not kick in because list3 is only on disk - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) assert(store.get("list4").isDefined, "list4 was not in store") - assert(store.get("list4").get.size === 2) + assert(store.get("list4").get.data.size === 2) } test("negative byte values in ByteBufferInputStream") { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6c49870455..316e14100e 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite { val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) + val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite { testEvent(taskStart, taskStartJsonString) testEvent(taskGettingResult, taskGettingResultJsonString) testEvent(taskEnd, taskEndJsonString) + testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString) testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) @@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8)) + testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) // StorageLevel @@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } - test("Backward compatibility") { + test("StageInfo.details backward compatibility") { // StageInfo.details was added after 1.0.0. val info = makeStageInfo(1, 2, 3, 4L, 5L) assert(info.details.nonEmpty) @@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite { assert("" === newInfo.details) } + test("InputMetrics backward compatibility") { + // InputMetrics were added after 1.0.1. + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) + assert(metrics.inputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.inputMetrics.isEmpty) + } + /** -------------------------- * | Helper test running methods | @@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite { metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals) assertOptionEquals( metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) + assertOptionEquals( + metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals) } @@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite { assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime) } + private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { + assert(metrics1.readMethod === metrics2.readMethod) + assert(metrics1.bytesRead === metrics2.bytesRead) + } + private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) { assert(bm1.executorId === bm2.executorId) assert(bm1.host === bm2.host) @@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(w1, w2) } + private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) { + assertEquals(i1, i2) + } + private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) { assertEquals(t1, t2) } @@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite { new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) } - private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = { + /** + * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is + * set to true) or read data from a shuffle otherwise. + */ + private def makeTaskMetrics( + a: Long, + b: Long, + c: Long, + d: Long, + e: Int, + f: Int, + hasHadoopInput: Boolean) = { val t = new TaskMetrics - val sr = new ShuffleReadMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" t.executorDeserializeTime = a @@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite { t.jvmGCTime = d t.resultSerializationTime = a + b t.memoryBytesSpilled = a + c - sr.shuffleFinishTime = b + c - sr.totalBlocksFetched = e + f - sr.remoteBytesRead = b + d - sr.localBlocksFetched = e - sr.fetchWaitTime = a + d - sr.remoteBlocksFetched = f + + if (hasHadoopInput) { + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + inputMetrics.bytesRead = d + e + f + t.inputMetrics = Some(inputMetrics) + } else { + val sr = new ShuffleReadMetrics + sr.shuffleFinishTime = b + c + sr.totalBlocksFetched = e + f + sr.remoteBytesRead = b + d + sr.localBlocksFetched = e + sr.fetchWaitTime = a + d + sr.remoteBlocksFetched = f + t.shuffleReadMetrics = Some(sr) + } sw.shuffleBytesWritten = a + b + c sw.shuffleWriteTime = b + c + d - t.shuffleReadMetrics = Some(sr) t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => @@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite { | }, | "Shuffle Write Metrics":{ | "Shuffle Bytes Written":1200, - | "Shuffle Write Time":1500}, - | "Updated Blocks":[ + | "Shuffle Write Time":1500 + | }, + | "Updated Blocks":[ | {"Block ID":"rdd_0_0", | "Status":{ | "Storage Level":{ @@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite { |} """.stripMargin + private val taskEndWithHadoopInputJsonString = + """ + |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + |"Task End Reason":{"Reason":"Success"}, + |"Task Info":{ + | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, + | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + |}, + |"Task Metrics":{ + | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, + | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, + | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, + | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500}, + | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100}, + | "Updated Blocks":[ + | {"Block ID":"rdd_0_0", + | "Status":{ + | "Storage Level":{ + | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + | "Replication":2 + | }, + | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | } + | } + | ]} + |} + """ + private val jobStartJsonString = """ {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties": -- GitLab