From feadaf72f44e7c66521c03171592671d4c441bda Mon Sep 17 00:00:00 2001
From: Charles Reiss <charles@eecs.berkeley.edu>
Date: Mon, 31 Dec 2012 14:05:11 -0800
Subject: [PATCH] Mark key as not loading in CacheTracker even when compute()
 fails

---
 core/src/main/scala/spark/CacheTracker.scala | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 3d79078733..c8c4063cad 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -202,26 +202,26 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
             loading.add(key)
           }
         }
-        // If we got here, we have to load the split
-        // Tell the master that we're doing so
-        //val host = System.getProperty("spark.hostname", Utils.localHostName)
-        //val future = trackerActor !! AddedToCache(rdd.id, split.index, host)
-        // TODO: fetch any remote copy of the split that may be available
-        // TODO: also register a listener for when it unloads
-        logInfo("Computing partition " + split)
-        val elements = new ArrayBuffer[Any]
-        elements ++= rdd.compute(split, context)
         try {
+          // If we got here, we have to load the split
+          // Tell the master that we're doing so
+          //val host = System.getProperty("spark.hostname", Utils.localHostName)
+          //val future = trackerActor !! AddedToCache(rdd.id, split.index, host)
+          // TODO: fetch any remote copy of the split that may be available
+          // TODO: also register a listener for when it unloads
+          val elements = new ArrayBuffer[Any]
+          logInfo("Computing partition " + split)
+          elements ++= rdd.compute(split, context)
           // Try to put this block in the blockManager
           blockManager.put(key, elements, storageLevel, true)
           //future.apply() // Wait for the reply from the cache tracker
+          return elements.iterator.asInstanceOf[Iterator[T]]
         } finally {
           loading.synchronized {
             loading.remove(key)
             loading.notifyAll()
           }
         }
-        return elements.iterator.asInstanceOf[Iterator[T]]
     }
   }
 
-- 
GitLab