Skip to content
Snippets Groups Projects
Commit 3bc55287 authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-9946] [SPARK-9589] [SQL] fix NPE and thread-safety in TaskMemoryManager

Currently, we access the `page.pageNumer` after it's freed, that could be modified by other thread, cause NPE.

The same TaskMemoryManager could be used by multiple threads (for example, Python UDF and TransportScript), so it should be thread safe to allocate/free memory/page. The underlying Bitset and HashSet are not thread safe, we should put them inside a synchronized block.

cc JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes #8177 from davies/memory_manager.
parent 57c2d088
No related branches found
No related tags found
No related merge requests found
......@@ -144,14 +144,16 @@ public class TaskMemoryManager {
public void freePage(MemoryBlock page) {
assert (page.pageNumber != -1) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
executorMemoryManager.free(page);
assert(allocatedPages.get(page.pageNumber));
pageTable[page.pageNumber] = null;
synchronized (this) {
allocatedPages.clear(page.pageNumber);
}
pageTable[page.pageNumber] = null;
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
// Cannot access a page once it's freed.
executorMemoryManager.free(page);
}
/**
......@@ -166,7 +168,9 @@ public class TaskMemoryManager {
public MemoryBlock allocate(long size) throws OutOfMemoryError {
assert(size > 0) : "Size must be positive, but got " + size;
final MemoryBlock memory = executorMemoryManager.allocate(size);
allocatedNonPageMemory.add(memory);
synchronized(allocatedNonPageMemory) {
allocatedNonPageMemory.add(memory);
}
return memory;
}
......@@ -176,8 +180,10 @@ public class TaskMemoryManager {
public void free(MemoryBlock memory) {
assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()";
executorMemoryManager.free(memory);
final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
synchronized(allocatedNonPageMemory) {
final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
}
}
/**
......@@ -223,9 +229,10 @@ public class TaskMemoryManager {
if (inHeap) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final Object page = pageTable[pageNumber].getBaseObject();
final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
return page;
assert (page.getBaseObject() != null);
return page.getBaseObject();
} else {
return null;
}
......@@ -244,7 +251,9 @@ public class TaskMemoryManager {
// converted the absolute address into a relative address. Here, we invert that operation:
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
return pageTable[pageNumber].getBaseOffset() + offsetInPage;
final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
return page.getBaseOffset() + offsetInPage;
}
}
......@@ -260,14 +269,17 @@ public class TaskMemoryManager {
freePage(page);
}
}
final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
while (iter.hasNext()) {
final MemoryBlock memory = iter.next();
freedBytes += memory.size();
// We don't call free() here because that calls Set.remove, which would lead to a
// ConcurrentModificationException here.
executorMemoryManager.free(memory);
iter.remove();
synchronized (allocatedNonPageMemory) {
final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
while (iter.hasNext()) {
final MemoryBlock memory = iter.next();
freedBytes += memory.size();
// We don't call free() here because that calls Set.remove, which would lead to a
// ConcurrentModificationException here.
executorMemoryManager.free(memory);
iter.remove();
}
}
return freedBytes;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment