diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java index 62c29c8cc1e4d1068f42b4df8bbe3fdc05cb3910..cbbe8594627a54d1a9cc021ea74c81bf083f3270 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java @@ -17,6 +17,12 @@ package org.apache.spark.unsafe.memory; +import java.lang.ref.WeakReference; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import javax.annotation.concurrent.GuardedBy; + /** * Manages memory for an executor. Individual operators / tasks allocate memory through * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager. @@ -33,6 +39,12 @@ public class ExecutorMemoryManager { */ final boolean inHeap; + @GuardedBy("this") + private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize = + new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>(); + + private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024; + /** * Construct a new ExecutorMemoryManager. * @@ -43,16 +55,57 @@ public class ExecutorMemoryManager { this.allocator = allocator; } + /** + * Returns true if allocations of the given size should go through the pooling mechanism and + * false otherwise. + */ + private boolean shouldPool(long size) { + // Very small allocations are less likely to benefit from pooling. + // At some point, we should explore supporting pooling for off-heap memory, but for now we'll + // ignore that case in the interest of simplicity. + return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator; + } + /** * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed * to be zeroed out (call `zero()` on the result if this is necessary). */ MemoryBlock allocate(long size) throws OutOfMemoryError { - return allocator.allocate(size); + if (shouldPool(size)) { + synchronized (this) { + final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool != null) { + while (!pool.isEmpty()) { + final WeakReference<MemoryBlock> blockReference = pool.pop(); + final MemoryBlock memory = blockReference.get(); + if (memory != null) { + assert (memory.size() == size); + return memory; + } + } + bufferPoolsBySize.remove(size); + } + } + return allocator.allocate(size); + } else { + return allocator.allocate(size); + } } void free(MemoryBlock memory) { - allocator.free(memory); + final long size = memory.size(); + if (shouldPool(size)) { + synchronized (this) { + LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size); + if (pool == null) { + pool = new LinkedList<WeakReference<MemoryBlock>>(); + bufferPoolsBySize.put(size, pool); + } + pool.add(new WeakReference<MemoryBlock>(memory)); + } + } else { + allocator.free(memory); + } } }