diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index f78e0e5fb258f88b283e4838d0c7fe7ac0a51f73..e0e050d7c96498915e614349aa1f0a584a8941e0 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -16,8 +16,11 @@ import spark._ import spark.storage._ object ShuffleMapTask { + + // A simple map between the stage id to the serialized byte array of a task. + // Served as a cache for task serialization because serialization can be + // expensive on the master node if it needs to launch thousands of tasks. val serializedInfoCache = new HashMap[Int, Array[Byte]] - val deserializedInfoCache = new HashMap[Int, (RDD[_], ShuffleDependency[_,_,_])] def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { synchronized { @@ -39,29 +42,21 @@ object ShuffleMapTask { def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = { synchronized { - val old = deserializedInfoCache.get(stageId) - if (old != null) { - return old - } else { - val loader = Thread.currentThread.getContextClassLoader - val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) - val objIn = new ObjectInputStream(in) { - override def resolveClass(desc: ObjectStreamClass) = - Class.forName(desc.getName, false, loader) - } - val rdd = objIn.readObject().asInstanceOf[RDD[_]] - val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]] - val tuple = (rdd, dep) - deserializedInfoCache.put(stageId, tuple) - return tuple + val loader = Thread.currentThread.getContextClassLoader + val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) + val objIn = new ObjectInputStream(in) { + override def resolveClass(desc: ObjectStreamClass) = + Class.forName(desc.getName, false, loader) } + val rdd = objIn.readObject().asInstanceOf[RDD[_]] + val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]] + return (rdd, dep) } } def clearCache() { synchronized { serializedInfoCache.clear() - deserializedInfoCache.clear() } } }