Skip to content
Snippets Groups Projects
Commit cf0cce90 authored by Sital Kedia's avatar Sital Kedia Committed by Davies Liu
Browse files

[SPARK-17113] [SHUFFLE] Job failure due to Executor OOM in offheap mode

## What changes were proposed in this pull request?

This PR fixes executor OOM in offheap mode due to bug in Cooperative Memory Management for UnsafeExternSorter.  UnsafeExternalSorter was checking if memory page is being used by upstream by comparing the base object address of the current page with the base object address of upstream. However, in case of offheap memory allocation, the base object addresses are always null, so there was no spilling happening and eventually the operator would OOM.

Following is the stack trace this issue addresses -
java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)

## How was this patch tested?

Tested by running the failing job.

Author: Sital Kedia <skedia@fb.com>

Closes #14693 from sitalkedia/fix_offheap_oom.
parent 071eaaf9
No related branches found
No related tags found
No related merge requests found
......@@ -522,7 +522,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// is accessing the current record. We free this page in that caller's next loadNext()
// call.
for (MemoryBlock page : allocatedPages) {
if (!loaded || page.getBaseObject() != upstream.getBaseObject()) {
if (!loaded || page.pageNumber != ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
released += page.size();
freePage(page);
} else {
......
......@@ -248,6 +248,7 @@ public final class UnsafeInMemorySorter {
private long baseOffset;
private long keyPrefix;
private int recordLength;
private long currentPageNumber;
private SortedIterator(int numRecords, int offset) {
this.numRecords = numRecords;
......@@ -262,6 +263,7 @@ public final class UnsafeInMemorySorter {
iter.baseOffset = baseOffset;
iter.keyPrefix = keyPrefix;
iter.recordLength = recordLength;
iter.currentPageNumber = currentPageNumber;
return iter;
}
......@@ -279,6 +281,7 @@ public final class UnsafeInMemorySorter {
public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = array.get(offset + position);
currentPageNumber = memoryManager.decodePageNumber(recordPointer);
baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4);
......@@ -292,6 +295,10 @@ public final class UnsafeInMemorySorter {
@Override
public long getBaseOffset() { return baseOffset; }
public long getCurrentPageNumber() {
return currentPageNumber;
}
@Override
public int getRecordLength() { return recordLength; }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment