diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 202fba699ab266bc650be16bb95302523a147956..f45b463fb6f623161ef56bac329da16ebbc40cd0 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -69,11 +69,13 @@ case class FetchFailed(
     bmAddress: BlockManagerId,  // Note that bmAddress can be null
     shuffleId: Int,
     mapId: Int,
-    reduceId: Int)
+    reduceId: Int,
+    message: String)
   extends TaskFailedReason {
   override def toErrorString: String = {
     val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
-    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
+    s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, " +
+      s"message=\n$message\n)"
   }
 }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index af17b5d5d25717ff6d2be620b8f4db781fc02e6e..96114c0423a9e468f61a674f132bb5dfbe9d0ac4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1053,7 +1053,7 @@ class DAGScheduler(
         logInfo("Resubmitted " + task + ", so marking it as still running")
         stage.pendingTasks += task
 
-      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
         val failedStage = stageIdToStage(task.stageId)
         val mapStage = shuffleToMapStage(shuffleId)
 
@@ -1063,7 +1063,7 @@ class DAGScheduler(
         if (runningStages.contains(failedStage)) {
           logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
             s"due to a fetch failure from $mapStage (${mapStage.name})")
-          markStageAsFinished(failedStage, Some("Fetch failure"))
+          markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
           runningStages -= failedStage
         }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 54904bffdf10b81ee7435aeaddcffa6adf5d547e..4e3d9de540783962313557cf7ce2eb458441970b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -215,7 +215,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
         taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
                       " STAGE_ID=" + taskEnd.stageId
         stageLogInfo(taskEnd.stageId, taskStatus)
-      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId, message) =>
         taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
                       taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
                       mapId + " REDUCE_ID=" + reduceId
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index 71c08e9d5a8c3cbcba47516e89a7abf91a96d8d1..0c1b6f4defdb34f0d967606e1610f293aacc8f8c 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -19,6 +19,7 @@ package org.apache.spark.shuffle
 
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.{FetchFailed, TaskEndReason}
+import org.apache.spark.util.Utils
 
 /**
  * Failed to fetch a shuffle block. The executor catches this exception and propagates it
@@ -30,13 +31,11 @@ private[spark] class FetchFailedException(
     bmAddress: BlockManagerId,
     shuffleId: Int,
     mapId: Int,
-    reduceId: Int)
-  extends Exception {
-
-  override def getMessage: String =
-    "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
+    reduceId: Int,
+    message: String)
+  extends Exception(message) {
 
-  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
+  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
 }
 
 /**
@@ -46,7 +45,4 @@ private[spark] class MetadataFetchFailedException(
     shuffleId: Int,
     reduceId: Int,
     message: String)
-  extends FetchFailedException(null, shuffleId, -1, reduceId) {
-
-  override def getMessage: String = message
-}
+  extends FetchFailedException(null, shuffleId, -1, reduceId, message)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index f49917b7fe833f12beda6913a8010d3296515599..0d5247f4176d4d3e7677fbc22e0515045f670b43 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -19,12 +19,13 @@ package org.apache.spark.shuffle.hash
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
 
 import org.apache.spark._
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
-import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.{CompletionIterator, Utils}
 
 private[hash] object BlockStoreShuffleFetcher extends Logging {
   def fetch[T](
@@ -52,21 +53,22 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
         (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
     }
 
-    def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
+    def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
       val blockId = blockPair._1
       val blockOption = blockPair._2
       blockOption match {
-        case Some(block) => {
+        case Success(block) => {
           block.asInstanceOf[Iterator[T]]
         }
-        case None => {
+        case Failure(e) => {
           blockId match {
             case ShuffleBlockId(shufId, mapId, _) =>
               val address = statuses(mapId.toInt)._1
-              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
+              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
+                Utils.exceptionString(e))
             case _ =>
               throw new SparkException(
-                "Failed to get block " + blockId + ", which is not a shuffle block")
+                "Failed to get block " + blockId + ", which is not a shuffle block", e)
           }
         }
       }
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index ee89c7e521f4e02627b76a47f64dd5807e291c1d..1e579187e4193e324f0223f39cb499b7f7cb43b5 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
 import java.util.concurrent.LinkedBlockingQueue
 
 import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
+import scala.util.{Failure, Success, Try}
 
 import org.apache.spark.{Logging, TaskContext}
 import org.apache.spark.network.BlockTransferService
@@ -55,7 +56,7 @@ final class ShuffleBlockFetcherIterator(
     blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
     serializer: Serializer,
     maxBytesInFlight: Long)
-  extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
+  extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging {
 
   import ShuffleBlockFetcherIterator._
 
@@ -118,16 +119,18 @@ final class ShuffleBlockFetcherIterator(
   private[this] def cleanup() {
     isZombie = true
     // Release the current buffer if necessary
-    if (currentResult != null && !currentResult.failed) {
-      currentResult.buf.release()
+    currentResult match {
+      case SuccessFetchResult(_, _, buf) => buf.release()
+      case _ =>
     }
 
     // Release buffers in the results queue
     val iter = results.iterator()
     while (iter.hasNext) {
       val result = iter.next()
-      if (!result.failed) {
-        result.buf.release()
+      result match {
+        case SuccessFetchResult(_, _, buf) => buf.release()
+        case _ =>
       }
     }
   }
@@ -151,7 +154,7 @@ final class ShuffleBlockFetcherIterator(
             // Increment the ref count because we need to pass this to a different thread.
             // This needs to be released after use.
             buf.retain()
-            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId), buf))
+            results.put(new SuccessFetchResult(BlockId(blockId), sizeMap(blockId), buf))
             shuffleMetrics.remoteBytesRead += buf.size
             shuffleMetrics.remoteBlocksFetched += 1
           }
@@ -160,7 +163,7 @@ final class ShuffleBlockFetcherIterator(
 
         override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
           logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
-          results.put(new FetchResult(BlockId(blockId), -1, null))
+          results.put(new FailureFetchResult(BlockId(blockId), e))
         }
       }
     )
@@ -231,12 +234,12 @@ final class ShuffleBlockFetcherIterator(
         val buf = blockManager.getBlockData(blockId)
         shuffleMetrics.localBlocksFetched += 1
         buf.retain()
-        results.put(new FetchResult(blockId, 0, buf))
+        results.put(new SuccessFetchResult(blockId, 0, buf))
       } catch {
         case e: Exception =>
           // If we see an exception, stop immediately.
           logError(s"Error occurred while fetching local blocks", e)
-          results.put(new FetchResult(blockId, -1, null))
+          results.put(new FailureFetchResult(blockId, e))
           return
       }
     }
@@ -267,15 +270,17 @@ final class ShuffleBlockFetcherIterator(
 
   override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
 
-  override def next(): (BlockId, Option[Iterator[Any]]) = {
+  override def next(): (BlockId, Try[Iterator[Any]]) = {
     numBlocksProcessed += 1
     val startFetchWait = System.currentTimeMillis()
     currentResult = results.take()
     val result = currentResult
     val stopFetchWait = System.currentTimeMillis()
     shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
-    if (!result.failed) {
-      bytesInFlight -= result.size
+
+    result match {
+      case SuccessFetchResult(_, size, _) => bytesInFlight -= size
+      case _ =>
     }
     // Send fetch requests up to maxBytesInFlight
     while (fetchRequests.nonEmpty &&
@@ -283,20 +288,21 @@ final class ShuffleBlockFetcherIterator(
       sendRequest(fetchRequests.dequeue())
     }
 
-    val iteratorOpt: Option[Iterator[Any]] = if (result.failed) {
-      None
-    } else {
-      val is = blockManager.wrapForCompression(result.blockId, result.buf.createInputStream())
-      val iter = serializer.newInstance().deserializeStream(is).asIterator
-      Some(CompletionIterator[Any, Iterator[Any]](iter, {
-        // Once the iterator is exhausted, release the buffer and set currentResult to null
-        // so we don't release it again in cleanup.
-        currentResult = null
-        result.buf.release()
-      }))
+    val iteratorTry: Try[Iterator[Any]] = result match {
+      case FailureFetchResult(_, e) => Failure(e)
+      case SuccessFetchResult(blockId, _, buf) => {
+        val is = blockManager.wrapForCompression(blockId, buf.createInputStream())
+        val iter = serializer.newInstance().deserializeStream(is).asIterator
+        Success(CompletionIterator[Any, Iterator[Any]](iter, {
+          // Once the iterator is exhausted, release the buffer and set currentResult to null
+          // so we don't release it again in cleanup.
+          currentResult = null
+          buf.release()
+        }))
+      }
     }
 
-    (result.blockId, iteratorOpt)
+    (result.blockId, iteratorTry)
   }
 }
 
@@ -315,14 +321,30 @@ object ShuffleBlockFetcherIterator {
   }
 
   /**
-   * Result of a fetch from a remote block. A failure is represented as size == -1.
+   * Result of a fetch from a remote block.
+   */
+  private[storage] sealed trait FetchResult {
+    val blockId: BlockId
+  }
+
+  /**
+   * Result of a fetch from a remote block successfully.
    * @param blockId block id
    * @param size estimated size of the block, used to calculate bytesInFlight.
-   *             Note that this is NOT the exact bytes. -1 if failure is present.
-   * @param buf [[ManagedBuffer]] for the content. null is error.
+   *             Note that this is NOT the exact bytes.
+   * @param buf [[ManagedBuffer]] for the content.
    */
-  case class FetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer) {
-    def failed: Boolean = size == -1
-    if (failed) assert(buf == null) else assert(buf != null)
+  private[storage] case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
+    extends FetchResult {
+    require(buf != null)
+    require(size >= 0)
   }
+
+  /**
+   * Result of a fetch from a remote block unsuccessfully.
+   * @param blockId block id
+   * @param e the failure exception
+   */
+  private[storage] case class FailureFetchResult(blockId: BlockId, e: Throwable)
+    extends FetchResult
 }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 43c7fba06694adf39adffae81848956d16478bcf..f7ae1f7f334de2ce9643e44456feb2731e4ebd6b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -279,7 +279,8 @@ private[spark] object JsonProtocol {
         ("Block Manager Address" -> blockManagerAddress) ~
         ("Shuffle ID" -> fetchFailed.shuffleId) ~
         ("Map ID" -> fetchFailed.mapId) ~
-        ("Reduce ID" -> fetchFailed.reduceId)
+        ("Reduce ID" -> fetchFailed.reduceId) ~
+        ("Message" -> fetchFailed.message)
       case exceptionFailure: ExceptionFailure =>
         val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
         val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
@@ -629,7 +630,9 @@ private[spark] object JsonProtocol {
         val shuffleId = (json \ "Shuffle ID").extract[Int]
         val mapId = (json \ "Map ID").extract[Int]
         val reduceId = (json \ "Reduce ID").extract[Int]
-        new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId)
+        val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
+        new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
+          message.getOrElse("Unknown reason"))
       case `exceptionFailure` =>
         val className = (json \ "Class Name").extract[String]
         val description = (json \ "Description").extract[String]
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b402c5f334bb04a415f3fbd64a28144ad58e80ce..a33046d2040d8c1e049bae54b29bb63cf030d442 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1597,7 +1597,7 @@ private[spark] object Utils extends Logging {
   }
 
   /** Return a nice string representation of the exception, including the stack trace. */
-  def exceptionString(e: Exception): String = {
+  def exceptionString(e: Throwable): String = {
     if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a2e4f712db55b1c7807822c7b52b8babac92e055..819f95634bcdcf90c91c914d1075ceb5b9835e3d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -431,7 +431,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     // the 2nd ResultTask failed
     complete(taskSets(1), Seq(
         (Success, 42),
-        (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
+        (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
     // this will get called
     // blockManagerMaster.removeExecutor("exec-hostA")
     // ask the scheduler to try it again
@@ -461,7 +461,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     // The first result task fails, with a fetch failure for the output from the first mapper.
     runEvent(CompletionEvent(
       taskSets(1).tasks(0),
-      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
       null,
       Map[Long, Any](),
       null,
@@ -472,7 +472,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     // The second ResultTask fails, with a fetch failure for the output from the second mapper.
     runEvent(CompletionEvent(
       taskSets(1).tasks(0),
-      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
       null,
       Map[Long, Any](),
       null,
@@ -624,7 +624,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         (Success, makeMapStatus("hostC", 1))))
     // fail the third stage because hostA went down
     complete(taskSets(2), Seq(
-        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
+        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
     // TODO assert this:
     // blockManagerMaster.removeExecutor("exec-hostA")
     // have DAGScheduler try again
@@ -655,7 +655,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         (Success, makeMapStatus("hostB", 1))))
     // pretend stage 0 failed because hostA went down
     complete(taskSets(2), Seq(
-        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
+        (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
     // TODO assert this:
     // blockManagerMaster.removeExecutor("exec-hostA")
     // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 28f766570e96f235b968a760dc41fc2cf0af6b81..1eaabb93adbed33488a9dbeb32092518e57f8f4b 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -102,7 +102,7 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     for (i <- 0 until 5) {
       assert(iterator.hasNext, s"iterator should have 5 elements but actually has $i elements")
       val (blockId, subIterator) = iterator.next()
-      assert(subIterator.isDefined,
+      assert(subIterator.isSuccess,
         s"iterator should have 5 elements defined but actually has $i elements")
 
       // Make sure we release the buffer once the iterator is exhausted.
@@ -230,8 +230,8 @@ class ShuffleBlockFetcherIteratorSuite extends FunSuite {
     sem.acquire()
 
     // The first block should be defined, and the last two are not defined (due to failure)
-    assert(iterator.next()._2.isDefined === true)
-    assert(iterator.next()._2.isDefined === false)
-    assert(iterator.next()._2.isDefined === false)
+    assert(iterator.next()._2.isSuccess)
+    assert(iterator.next()._2.isFailure)
+    assert(iterator.next()._2.isFailure)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6567c5ab836e75b85e838fbea55271ff81ca7c56..2efbae689771a5da1a3e1951e697bc1fee5d4307 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -115,7 +115,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     // Go through all the failure cases to make sure we are counting them as failures.
     val taskFailedReasons = Seq(
       Resubmitted,
-      new FetchFailed(null, 0, 0, 0),
+      new FetchFailed(null, 0, 0, 0, "ignored"),
       new ExceptionFailure("Exception", "description", null, None),
       TaskResultLost,
       TaskKilled,
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d235d7a0ed839e87704cb7c2da7f728c4767dc71..a91c9ddeaef3656ae92735ab33abc53115649da2 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -107,7 +107,8 @@ class JsonProtocolSuite extends FunSuite {
     testJobResult(jobFailed)
 
     // TaskEndReason
-    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19)
+    val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
+      "Some exception")
     val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
     testTaskEndReason(Success)
     testTaskEndReason(Resubmitted)
@@ -396,6 +397,7 @@ class JsonProtocolSuite extends FunSuite {
         assert(r1.mapId === r2.mapId)
         assert(r1.reduceId === r2.reduceId)
         assertEquals(r1.bmAddress, r2.bmAddress)
+        assert(r1.message === r2.message)
       case (r1: ExceptionFailure, r2: ExceptionFailure) =>
         assert(r1.className === r2.className)
         assert(r1.description === r2.description)