From 273fb5cc109ac0a032f84c1566ae908cd0eb27b6 Mon Sep 17 00:00:00 2001
From: Charles Reiss <charles@eecs.berkeley.edu>
Date: Thu, 3 Jan 2013 14:09:56 -0800
Subject: [PATCH] Throw FetchFailedException for cached missing locs

---
 .../main/scala/spark/MapOutputTracker.scala   | 36 +++++++++++++------
 1 file changed, 26 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 70eb9f702e..9f2aa76830 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -139,8 +139,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
               case e: InterruptedException =>
             }
           }
-          return mapStatuses.get(shuffleId).map(status =>
-            (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
+          return MapOutputTracker.convertMapStatuses(shuffleId, reduceId,
+                                                     mapStatuses.get(shuffleId))
         } else {
           fetching += shuffleId
         }
@@ -156,21 +156,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
         fetchedStatuses = deserializeStatuses(fetchedBytes)
         logInfo("Got the output locations")
         mapStatuses.put(shuffleId, fetchedStatuses)
-        if (fetchedStatuses.contains(null)) {
-          throw new FetchFailedException(null, shuffleId, -1, reduceId,
-            new Exception("Missing an output location for shuffle " + shuffleId))
-        }
       } finally {
         fetching.synchronized {
           fetching -= shuffleId
           fetching.notifyAll()
         }
       }
-      return fetchedStatuses.map(s =>
-        (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+      return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
     } else {
-      return statuses.map(s =>
-        (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
+      return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
     }
   }
 
@@ -258,6 +252,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
 private[spark] object MapOutputTracker {
   private val LOG_BASE = 1.1
 
+  // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
+  // any of the statuses is null (indicating a missing location due to a failed mapper),
+  // throw a FetchFailedException.
+  def convertMapStatuses(
+        shuffleId: Int,
+        reduceId: Int,
+        statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
+    if (statuses == null) {
+      throw new FetchFailedException(null, shuffleId, -1, reduceId,
+        new Exception("Missing all output locations for shuffle " + shuffleId))
+    }
+    statuses.map {
+      status => 
+        if (status == null) {
+          throw new FetchFailedException(null, shuffleId, -1, reduceId,
+            new Exception("Missing an output location for shuffle " + shuffleId))
+        } else {
+          (status.address, decompressSize(status.compressedSizes(reduceId)))
+        }
+    }
+  }
+
   /**
    * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
    * We do this by encoding the log base 1.1 of the size as an integer, which can support
-- 
GitLab