Skip to content
Snippets Groups Projects
Commit 31c53e91 authored by Denny's avatar Denny
Browse files

Use stageId as index for fileSet caches.

parent 4d3471dd
No related branches found
No related tags found
No related merge requests found
...@@ -22,6 +22,7 @@ object ShuffleMapTask { ...@@ -22,6 +22,7 @@ object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks. // expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new JHashMap[Int, Array[Byte]] val serializedInfoCache = new JHashMap[Int, Array[Byte]]
val fileSetCache = new JHashMap[Int, Array[Byte]] val fileSetCache = new JHashMap[Int, Array[Byte]]
val jarSetCache = new JHashMap[Int, Array[Byte]]
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = { def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
synchronized { synchronized {
...@@ -42,8 +43,8 @@ object ShuffleMapTask { ...@@ -42,8 +43,8 @@ object ShuffleMapTask {
} }
// Since both the JarSet and FileSet have the same format this is used for both. // Since both the JarSet and FileSet have the same format this is used for both.
def serializeFileSet(set : HashMap[String, Long]) : Array[Byte] = { def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
val old = fileSetCache.get(set.hashCode) val old = cache.get(stageId)
if (old != null) { if (old != null) {
return old return old
} else { } else {
...@@ -52,7 +53,7 @@ object ShuffleMapTask { ...@@ -52,7 +53,7 @@ object ShuffleMapTask {
objOut.writeObject(set.toArray) objOut.writeObject(set.toArray)
objOut.close() objOut.close()
val bytes = out.toByteArray val bytes = out.toByteArray
fileSetCache.put(set.hashCode, bytes) cache.put(stageId, bytes)
return bytes return bytes
} }
} }
...@@ -84,6 +85,7 @@ object ShuffleMapTask { ...@@ -84,6 +85,7 @@ object ShuffleMapTask {
synchronized { synchronized {
serializedInfoCache.clear() serializedInfoCache.clear()
fileSetCache.clear() fileSetCache.clear()
jarSetCache.clear()
} }
} }
} }
...@@ -112,10 +114,10 @@ class ShuffleMapTask( ...@@ -112,10 +114,10 @@ class ShuffleMapTask(
out.writeInt(bytes.length) out.writeInt(bytes.length)
out.write(bytes) out.write(bytes)
val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet) val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache)
out.writeInt(fileSetBytes.length) out.writeInt(fileSetBytes.length)
out.write(fileSetBytes) out.write(fileSetBytes)
val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet) val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache)
out.writeInt(jarSetBytes.length) out.writeInt(jarSetBytes.length)
out.write(jarSetBytes) out.write(jarSetBytes)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment