Skip to content
Snippets Groups Projects
Commit 6e3abed8 authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap


## What changes were proposed in this pull request?

Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).

This PR fix the off-by-one bug in BytesToBytesMap.

This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 .

## How was this patch tested?

Added regression test.

Author: Davies Liu <davies@databricks.com>

Closes #16844 from davies/off_by_one.

(cherry picked from commit 3d0c3af0)
Signed-off-by: default avatarDavies Liu <davies.liu@gmail.com>
parent 55958bcd
No related branches found
No related tags found
No related merge requests found
......@@ -698,7 +698,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
|| !canGrowArray && numKeys > growthThreshold) {
|| !canGrowArray && numKeys >= growthThreshold) {
return false;
}
......@@ -742,7 +742,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;
if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
try {
growAndRehash();
} catch (OutOfMemoryError oom) {
......@@ -911,6 +911,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
freePage(dataPage);
}
allocate(initialCapacity);
canGrowArray = true;
currentPage = null;
pageCursor = 0;
}
......
......@@ -342,4 +342,44 @@ class UnsafeFixedWidthAggregationMapSuite
}
}
testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") {
val pageSize = 4096000
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
128, // initial capacity
pageSize,
false // disable perf metrics
)
val rand = new Random(42)
for (i <- 1 to 63) {
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
buf.setInt(0, str.length)
}
// Simulate running out of space
memoryManager.limit(0)
var str = rand.nextString(1024)
var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf != null)
str = rand.nextString(1024)
buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf == null)
// Convert the map into a sorter. This used to fail before the fix for SPARK-10474
// because we would try to acquire space for the in-memory sorter pointer array before
// actually releasing the pages despite having spilled all of them.
var sorter: UnsafeKVExternalSorter = null
try {
sorter = map.destructAndCreateExternalSorter()
map.free()
} finally {
if (sorter != null) {
sorter.cleanupResources()
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment