Skip to content
Snippets Groups Projects
Commit d187e7de authored by Sital Kedia's avatar Sital Kedia Committed by Davies Liu
Browse files

[SPARK-14363] Fix executor OOM due to memory leak in the Sorter

## What changes were proposed in this pull request?

Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR https://github.com/apache/spark/pull/9241

## How was this patch tested?

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes #12285 from sitalkedia/executor_oom.
parent c439d88e
No related branches found
No related tags found
No related merge requests found
...@@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer { ...@@ -215,8 +215,6 @@ final class ShuffleExternalSorter extends MemoryConsumer {
} }
} }
inMemSorter.reset();
if (!isLastFile) { // i.e. this is a spill file if (!isLastFile) { // i.e. this is a spill file
// The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
// are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
...@@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer { ...@@ -255,6 +253,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
writeSortedFile(false); writeSortedFile(false);
final long spillSize = freeMemory(); final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
// we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize; return spillSize;
} }
......
...@@ -51,9 +51,12 @@ final class ShuffleInMemorySorter { ...@@ -51,9 +51,12 @@ final class ShuffleInMemorySorter {
*/ */
private int pos = 0; private int pos = 0;
private int initialSize;
ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) { ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize) {
this.consumer = consumer; this.consumer = consumer;
assert (initialSize > 0); assert (initialSize > 0);
this.initialSize = initialSize;
this.array = consumer.allocateArray(initialSize); this.array = consumer.allocateArray(initialSize);
this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE); this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE);
} }
...@@ -70,6 +73,10 @@ final class ShuffleInMemorySorter { ...@@ -70,6 +73,10 @@ final class ShuffleInMemorySorter {
} }
public void reset() { public void reset() {
if (consumer != null) {
consumer.freeArray(array);
this.array = consumer.allocateArray(initialSize);
}
pos = 0; pos = 0;
} }
......
...@@ -200,14 +200,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer { ...@@ -200,14 +200,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix()); spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
} }
spillWriter.close(); spillWriter.close();
inMemSorter.reset();
} }
final long spillSize = freeMemory(); final long spillSize = freeMemory();
// Note that this is more-or-less going to be a multiple of the page size, so wasted space in // Note that this is more-or-less going to be a multiple of the page size, so wasted space in
// pages will currently be counted as memory spilled even though that space isn't actually // pages will currently be counted as memory spilled even though that space isn't actually
// written to disk. This also counts the space needed to store the sorter's pointer array. // written to disk. This also counts the space needed to store the sorter's pointer array.
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
// records. Otherwise, if the task is over allocated memory, then without freeing the memory pages,
// we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize; return spillSize;
......
...@@ -84,6 +84,8 @@ public final class UnsafeInMemorySorter { ...@@ -84,6 +84,8 @@ public final class UnsafeInMemorySorter {
*/ */
private int pos = 0; private int pos = 0;
private long initialSize;
public UnsafeInMemorySorter( public UnsafeInMemorySorter(
final MemoryConsumer consumer, final MemoryConsumer consumer,
final TaskMemoryManager memoryManager, final TaskMemoryManager memoryManager,
...@@ -102,6 +104,7 @@ public final class UnsafeInMemorySorter { ...@@ -102,6 +104,7 @@ public final class UnsafeInMemorySorter {
LongArray array) { LongArray array) {
this.consumer = consumer; this.consumer = consumer;
this.memoryManager = memoryManager; this.memoryManager = memoryManager;
this.initialSize = array.size();
if (recordComparator != null) { if (recordComparator != null) {
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
...@@ -123,6 +126,10 @@ public final class UnsafeInMemorySorter { ...@@ -123,6 +126,10 @@ public final class UnsafeInMemorySorter {
} }
public void reset() { public void reset() {
if (consumer != null) {
consumer.freeArray(array);
this.array = consumer.allocateArray(initialSize);
}
pos = 0; pos = 0;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment