From bf578deaf2493081ceeb78dfd7617def5699a06e Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Thu, 26 Jun 2014 21:12:16 -0700
Subject: [PATCH] Removed throwable field from FetchFailedException and added
 MetadataFetchFailedException

FetchFailedException used to have a Throwable field, but in reality we never propagate any of the throwable/exceptions back to the driver because Executor explicitly looks for FetchFailedException and then sends FetchFailed as the TaskEndReason.

This pull request removes the throwable and adds a MetadataFetchFailedException that extends FetchFailedException (so now MapOutputTracker throws MetadataFetchFailedException instead).

Author: Reynold Xin <rxin@apache.org>

Closes #1227 from rxin/metadataFetchException and squashes the following commits:

5cb1e0a [Reynold Xin] MetadataFetchFailedException extends FetchFailedException.
8861ee2 [Reynold Xin] Throw MetadataFetchFailedException in MapOutputTracker.
---
 .../org/apache/spark/MapOutputTracker.scala   | 12 +++---
 .../org/apache/spark/TaskEndReason.scala      |  2 +-
 .../org/apache/spark/executor/Executor.scala  |  2 +-
 .../spark/scheduler/TaskDescription.scala     |  4 ++
 .../{ => shuffle}/FetchFailedException.scala  | 43 +++++++++++--------
 .../hash/BlockStoreShuffleFetcher.scala       |  5 ++-
 .../apache/spark/MapOutputTrackerSuite.scala  |  1 +
 7 files changed, 42 insertions(+), 27 deletions(-)
 rename core/src/main/scala/org/apache/spark/{ => shuffle}/FetchFailedException.scala (50%)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ee82d9fa78..182abacc47 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -25,9 +25,11 @@ import scala.concurrent.Await
 
 import akka.actor._
 import akka.pattern.ask
+
+import org.apache.spark.util._
 import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util._
 
 private[spark] sealed trait MapOutputTrackerMessage
 private[spark] case class GetMapOutputStatuses(shuffleId: Int)
@@ -168,8 +170,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
           return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
         }
       } else {
-        throw new FetchFailedException(null, shuffleId, -1, reduceId,
-          new Exception("Missing all output locations for shuffle " + shuffleId))
+        throw new MetadataFetchFailedException(
+          shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
       }
     } else {
       statuses.synchronized {
@@ -371,8 +373,8 @@ private[spark] object MapOutputTracker {
     statuses.map {
       status =>
         if (status == null) {
-          throw new FetchFailedException(null, shuffleId, -1, reduceId,
-            new Exception("Missing an output location for shuffle " + shuffleId))
+          throw new MetadataFetchFailedException(
+            shuffleId, reduceId, "Missing an output location for shuffle " + shuffleId)
         } else {
           (status.location, decompressSize(status.compressedSizes(reduceId)))
         }
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 5e8bd8c8e5..df42d679b4 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -65,7 +65,7 @@ case object Resubmitted extends TaskFailedReason {
  */
 @DeveloperApi
 case class FetchFailed(
-    bmAddress: BlockManagerId,
+    bmAddress: BlockManagerId,  // Note that bmAddress can be null
     shuffleId: Int,
     mapId: Int,
     reduceId: Int)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 557b9a3f46..4d3ba11633 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,8 +26,8 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
 import org.apache.spark.util.{AkkaUtils, Utils}
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 1481d70db4..4c96b9e5fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -21,6 +21,10 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.util.SerializableBuffer
 
+/**
+ * Description of a task that gets passed onto executors to be executed, usually created by
+ * [[TaskSetManager.resourceOffer]].
+ */
 private[spark] class TaskDescription(
     val taskId: Long,
     val executorId: String,
diff --git a/core/src/main/scala/org/apache/spark/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
similarity index 50%
rename from core/src/main/scala/org/apache/spark/FetchFailedException.scala
rename to core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
index 8eaa26bdb1..71c08e9d5a 100644
--- a/core/src/main/scala/org/apache/spark/FetchFailedException.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala
@@ -15,31 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.spark
+package org.apache.spark.shuffle
 
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.{FetchFailed, TaskEndReason}
 
+/**
+ * Failed to fetch a shuffle block. The executor catches this exception and propagates it
+ * back to DAGScheduler (through TaskEndReason) so we'd resubmit the previous stage.
+ *
+ * Note that bmAddress can be null.
+ */
 private[spark] class FetchFailedException(
-    taskEndReason: TaskEndReason,
-    message: String,
-    cause: Throwable)
+    bmAddress: BlockManagerId,
+    shuffleId: Int,
+    mapId: Int,
+    reduceId: Int)
   extends Exception {
 
-  def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int,
-      cause: Throwable) =
-    this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
-      "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
-      cause)
-
-  def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
-    this(FetchFailed(null, shuffleId, -1, reduceId),
-      "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
-
-  override def getMessage(): String = message
+  override def getMessage: String =
+    "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
 
+  def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId)
+}
 
-  override def getCause(): Throwable = cause
-
-  def toTaskEndReason: TaskEndReason = taskEndReason
+/**
+ * Failed to get shuffle metadata from [[org.apache.spark.MapOutputTracker]].
+ */
+private[spark] class MetadataFetchFailedException(
+    shuffleId: Int,
+    reduceId: Int,
+    message: String)
+  extends FetchFailedException(null, shuffleId, -1, reduceId) {
 
+  override def getMessage: String = message
 }
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index b05b6ea345..a932455776 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -20,11 +20,12 @@ package org.apache.spark.shuffle.hash
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 
+import org.apache.spark._
 import org.apache.spark.executor.ShuffleReadMetrics
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId}
 import org.apache.spark.util.CompletionIterator
-import org.apache.spark._
 
 private[hash] object BlockStoreShuffleFetcher extends Logging {
   def fetch[T](
@@ -63,7 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
           blockId match {
             case ShuffleBlockId(shufId, mapId, _) =>
               val address = statuses(mapId.toInt)._1
-              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
+              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
             case _ =>
               throw new SparkException(
                 "Failed to get block " + blockId + ", which is not a shuffle block")
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 95ba273f16..9702838085 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -24,6 +24,7 @@ import akka.testkit.TestActorRef
 import org.scalatest.FunSuite
 
 import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AkkaUtils
 
-- 
GitLab