Skip to content
Snippets Groups Projects
Commit 7ff8d68c authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array

When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception:
```
 java.io.IOException: Could not acquire 65536 bytes of memory
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
        at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```

Author: Andrew Or <andrew@databricks.com>

Closes #8827 from andrewor14/allocate-pointer-array.
parent 22be2ae1
No related branches found
No related tags found
No related merge requests found
......@@ -159,7 +159,7 @@ public final class UnsafeExternalSorter {
/**
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
private void initializeForWriting() throws IOException {
public void initializeForWriting() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
final long pointerArrayMemory =
UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
......@@ -187,6 +187,14 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
spill(true);
}
/**
* Sort and spill the current records in response to memory pressure.
* @param shouldInitializeForWriting whether to allocate memory for writing after the spill
*/
public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
......@@ -217,7 +225,9 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
initializeForWriting();
if (shouldInitializeForWriting) {
initializeForWriting();
}
}
/**
......
......@@ -85,6 +85,7 @@ public final class UnsafeKVExternalSorter {
// We will use the number of elements in the map as the initialSize of the
// UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize,
// we will use 1 as its initial size if the map is empty.
// TODO: track pointer array memory used by this in-memory sorter!
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
......@@ -123,8 +124,13 @@ public final class UnsafeKVExternalSorter {
pageSizeBytes,
inMemSorter);
sorter.spill();
// Note: This spill doesn't actually release any memory, so if we try to allocate a new
// pointer array immediately after the spill then we may fail to acquire sufficient space
// for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after*
// we have actually freed memory from our map.
sorter.spill(false /* initialize for writing */);
map.free();
sorter.initializeForWriting();
}
}
......
......@@ -23,9 +23,10 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.{TaskContextImpl, TaskContext, SparkFunSuite}
import org.apache.spark.shuffle.ShuffleMemoryManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
......@@ -325,7 +326,7 @@ class UnsafeFixedWidthAggregationMapSuite
// At here, we also test if copy is correct.
iter.getKey.copy()
iter.getValue.copy()
count += 1;
count += 1
}
// 1 record was from the map and 4096 records were explicitly inserted.
......@@ -333,4 +334,48 @@ class UnsafeFixedWidthAggregationMapSuite
map.free()
}
testWithMemoryLeakDetection("convert to external sorter under memory pressure (SPARK-10474)") {
val smm = ShuffleMemoryManager.createForTesting(65536)
val pageSize = 4096
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
smm,
128, // initial capacity
pageSize,
false // disable perf metrics
)
// Insert into the map until we've run out of space
val rand = new Random(42)
var hasSpace = true
while (hasSpace) {
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
if (buf == null) {
hasSpace = false
} else {
buf.setInt(0, str.length)
}
}
// Ensure we're actually maxed out by asserting that we can't acquire even just 1 byte
assert(smm.tryToAcquire(1) === 0)
// Convert the map into a sorter. This used to fail before the fix for SPARK-10474
// because we would try to acquire space for the in-memory sorter pointer array before
// actually releasing the pages despite having spilled all of them.
var sorter: UnsafeKVExternalSorter = null
try {
sorter = map.destructAndCreateExternalSorter()
} finally {
if (sorter != null) {
sorter.cleanupResources()
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment