Skip to content
Snippets Groups Projects
Commit 7956dd7a authored by Josh Rosen's avatar Josh Rosen
Browse files

[SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation

When on-heap memory allocation is used, ExecutorMemoryManager should maintain a cache / pool of buffers for re-use by tasks. This will significantly improve the performance of the new Tungsten's sort-shuffle for jobs with many short-lived tasks by eliminating a major source of GC.

This pull request is a minimum-viable-implementation of this idea.  In its current form, this patch significantly improves performance on a stress test which launches huge numbers of short-lived shuffle map tasks back-to-back in the same JVM.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6227 from JoshRosen/SPARK-7698 and squashes the following commits:

fd6cb55 [Josh Rosen] SoftReference -> WeakReference
b154e86 [Josh Rosen] WIP sketch of pooling in ExecutorMemoryManager
parent 3c434cbf
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,12 @@ ...@@ -17,6 +17,12 @@
package org.apache.spark.unsafe.memory; package org.apache.spark.unsafe.memory;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
/** /**
* Manages memory for an executor. Individual operators / tasks allocate memory through * Manages memory for an executor. Individual operators / tasks allocate memory through
* {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager. * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager.
...@@ -33,6 +39,12 @@ public class ExecutorMemoryManager { ...@@ -33,6 +39,12 @@ public class ExecutorMemoryManager {
*/ */
final boolean inHeap; final boolean inHeap;
@GuardedBy("this")
private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>();
private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
/** /**
* Construct a new ExecutorMemoryManager. * Construct a new ExecutorMemoryManager.
* *
...@@ -43,16 +55,57 @@ public class ExecutorMemoryManager { ...@@ -43,16 +55,57 @@ public class ExecutorMemoryManager {
this.allocator = allocator; this.allocator = allocator;
} }
/**
* Returns true if allocations of the given size should go through the pooling mechanism and
* false otherwise.
*/
private boolean shouldPool(long size) {
// Very small allocations are less likely to benefit from pooling.
// At some point, we should explore supporting pooling for off-heap memory, but for now we'll
// ignore that case in the interest of simplicity.
return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator;
}
/** /**
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
* to be zeroed out (call `zero()` on the result if this is necessary). * to be zeroed out (call `zero()` on the result if this is necessary).
*/ */
MemoryBlock allocate(long size) throws OutOfMemoryError { MemoryBlock allocate(long size) throws OutOfMemoryError {
return allocator.allocate(size); if (shouldPool(size)) {
synchronized (this) {
final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<MemoryBlock> blockReference = pool.pop();
final MemoryBlock memory = blockReference.get();
if (memory != null) {
assert (memory.size() == size);
return memory;
}
}
bufferPoolsBySize.remove(size);
}
}
return allocator.allocate(size);
} else {
return allocator.allocate(size);
}
} }
void free(MemoryBlock memory) { void free(MemoryBlock memory) {
allocator.free(memory); final long size = memory.size();
if (shouldPool(size)) {
synchronized (this) {
LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
if (pool == null) {
pool = new LinkedList<WeakReference<MemoryBlock>>();
bufferPoolsBySize.put(size, pool);
}
pool.add(new WeakReference<MemoryBlock>(memory));
}
} else {
allocator.free(memory);
}
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment