Skip to content
Snippets Groups Projects
Commit 291abc2c authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #181 from rxin/dev

Removed the deserialization cache for ShuffleMapTask
parents 51453eb8 3a6a95dc
No related branches found
No related tags found
No related merge requests found
......@@ -16,8 +16,11 @@ import spark._
import spark.storage._
object ShuffleMapTask {
// A simple map between the stage id to the serialized byte array of a task.
// Served as a cache for task serialization because serialization can be
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new HashMap[Int, Array[Byte]]
val deserializedInfoCache = new HashMap[Int, (RDD[_], ShuffleDependency[_,_,_])]
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
synchronized {
......@@ -39,29 +42,21 @@ object ShuffleMapTask {
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
synchronized {
val old = deserializedInfoCache.get(stageId)
if (old != null) {
return old
} else {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
val tuple = (rdd, dep)
deserializedInfoCache.put(stageId, tuple)
return tuple
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
return (rdd, dep)
}
}
def clearCache() {
synchronized {
serializedInfoCache.clear()
deserializedInfoCache.clear()
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment