Skip to content
Snippets Groups Projects
Commit 3a6a95dc authored by Reynold Xin's avatar Reynold Xin
Browse files

Removed the deserialization cache for ShuffleMapTask because it was

causing concurrency problems (some variables in Shark get set to null).
The cost of task deserialization on slaves is trivial compared with the
execution time of the task anyway.
parent 51453eb8
No related branches found
No related tags found
No related merge requests found
......@@ -16,8 +16,11 @@ import spark._
import spark.storage._
object ShuffleMapTask {
// A simple map between the stage id to the serialized byte array of a task.
// Served as a cache for task serialization because serialization can be
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new HashMap[Int, Array[Byte]]
val deserializedInfoCache = new HashMap[Int, (RDD[_], ShuffleDependency[_,_,_])]
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
synchronized {
......@@ -39,29 +42,21 @@ object ShuffleMapTask {
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
synchronized {
val old = deserializedInfoCache.get(stageId)
if (old != null) {
return old
} else {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
val tuple = (rdd, dep)
deserializedInfoCache.put(stageId, tuple)
return tuple
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
return (rdd, dep)
}
}
def clearCache() {
synchronized {
serializedInfoCache.clear()
deserializedInfoCache.clear()
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment