Skip to content
Snippets Groups Projects
Commit 722afbb2 authored by Eric Liang's avatar Eric Liang Committed by Josh Rosen
Browse files

[SPARK-17405] RowBasedKeyValueBatch should use default page size to prevent OOMs

## What changes were proposed in this pull request?

Before this change, we would always allocate 64MB per aggregation task for the first-level hash map storage, even when running in low-memory situations such as local mode. This changes it to use the memory manager default page size, which is automatically reduced from 64MB in these situations.

cc ooq JoshRosen

## How was this patch tested?

Tested manually with `bin/spark-shell --master=local[32]` and verifying that `(1 to math.pow(10, 3).toInt).toDF("n").withColumn("m", 'n % 2).groupBy('m).agg(sum('n)).show` does not crash.

Author: Eric Liang <ekl@databricks.com>

Closes #15016 from ericl/sc-4483.
parent 78d5d4dd
No related branches found
No related tags found
No related merge requests found
......@@ -37,19 +37,18 @@ import org.slf4j.LoggerFactory;
* We use `FixedLengthRowBasedKeyValueBatch` if all fields in the key and the value are fixed-length
* data types. Otherwise we use `VariableLengthRowBasedKeyValueBatch`.
*
* RowBasedKeyValueBatch is backed by a single page / MemoryBlock (defaults to 64MB). If the page
* is full, the aggregate logic should fallback to a second level, larger hash map. We intentionally
* use the single-page design because it simplifies memory address encoding & decoding for each
* key-value pair. Because the maximum capacity for RowBasedKeyValueBatch is only 2^16, it is
* unlikely we need a second page anyway. Filling the page requires an average size for key value
* pairs to be larger than 1024 bytes.
* RowBasedKeyValueBatch is backed by a single page / MemoryBlock (ranges from 1 to 64MB depending
* on the system configuration). If the page is full, the aggregate logic should fallback to a
* second level, larger hash map. We intentionally use the single-page design because it simplifies
* memory address encoding & decoding for each key-value pair. Because the maximum capacity for
* RowBasedKeyValueBatch is only 2^16, it is unlikely we need a second page anyway. Filling the
* page requires an average size for key value pairs to be larger than 1024 bytes.
*
*/
public abstract class RowBasedKeyValueBatch extends MemoryConsumer {
protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class);
private static final int DEFAULT_CAPACITY = 1 << 16;
private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024;
protected final StructType keySchema;
protected final StructType valueSchema;
......@@ -105,7 +104,7 @@ public abstract class RowBasedKeyValueBatch extends MemoryConsumer {
this.keyRow = new UnsafeRow(keySchema.length());
this.valueRow = new UnsafeRow(valueSchema.length());
if (!acquirePage(DEFAULT_PAGE_SIZE)) {
if (!acquirePage(manager.pageSizeBytes())) {
page = null;
recordStartOffset = 0;
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment