From 3a6a95dc2470ca2b5e706c174ffd8c048e70b407 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@cs.berkeley.edu>
Date: Mon, 27 Aug 2012 22:33:15 -0700
Subject: [PATCH] Removed the deserialization cache for ShuffleMapTask because
 it was causing concurrency problems (some variables in Shark get set to
 null). The cost of task deserialization on slaves is trivial compared with
 the execution time of the task anyway.

---
 .../spark/scheduler/ShuffleMapTask.scala      | 29 ++++++++-----------
 1 file changed, 12 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index f78e0e5fb2..e0e050d7c9 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -16,8 +16,11 @@ import spark._
 import spark.storage._
 
 object ShuffleMapTask {
+
+  // A simple map between the stage id to the serialized byte array of a task.
+  // Served as a cache for task serialization because serialization can be
+  // expensive on the master node if it needs to launch thousands of tasks.
   val serializedInfoCache = new HashMap[Int, Array[Byte]]
-  val deserializedInfoCache = new HashMap[Int, (RDD[_], ShuffleDependency[_,_,_])]
 
   def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
     synchronized {
@@ -39,29 +42,21 @@ object ShuffleMapTask {
 
   def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
     synchronized {
-      val old = deserializedInfoCache.get(stageId)
-      if (old != null) {
-        return old
-      } else {
-        val loader = Thread.currentThread.getContextClassLoader
-        val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-        val objIn = new ObjectInputStream(in) {
-          override def resolveClass(desc: ObjectStreamClass) =
-            Class.forName(desc.getName, false, loader)
-        }
-        val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-        val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
-        val tuple = (rdd, dep)
-        deserializedInfoCache.put(stageId, tuple)
-        return tuple
+      val loader = Thread.currentThread.getContextClassLoader
+      val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+      val objIn = new ObjectInputStream(in) {
+        override def resolveClass(desc: ObjectStreamClass) =
+          Class.forName(desc.getName, false, loader)
       }
+      val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+      val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
+      return (rdd, dep)
     }
   }
 
   def clearCache() {
     synchronized {
       serializedInfoCache.clear()
-      deserializedInfoCache.clear()
     }
   }
 }
-- 
GitLab