Skip to content
Snippets Groups Projects
Commit a411a40d authored by zsxwing's avatar zsxwing Committed by Andrew Or
Browse files

[SPARK-7913] [CORE] Increase the maximum capacity of PartitionedPairBuffe,...

[SPARK-7913] [CORE] Increase the maximum capacity of PartitionedPairBuffe, PartitionedSerializedPairBuffer and AppendOnlyMap

The previous growing strategy is alway doubling the capacity.

This PR adjusts the growing strategy: doubling the capacity but if overflow, use the maximum capacity as the new capacity. It increases the maximum capacity of PartitionedPairBuffer from `2 ^ 29` to `2 ^ 30 - 1`, the maximum capacity of PartitionedSerializedPairBuffer from `2 ^ 28` to `(2 ^ 29) - 1`, and the maximum capacity of AppendOnlyMap from `0.7 * (2 ^ 29)` to `(2 ^ 29)`.

Author: zsxwing <zsxwing@gmail.com>

Closes #6456 from zsxwing/SPARK-7913 and squashes the following commits:

abcb932 [zsxwing] Address comments
e30b61b [zsxwing] Increase the maximum capacity of AppendOnlyMap
05b6420 [zsxwing] Update the exception message
64fe227 [zsxwing] Increase the maximum capacity of PartitionedPairBuffer and PartitionedSerializedPairBuffer
parent 0fc4b96f
No related branches found
No related tags found
No related merge requests found
......@@ -32,12 +32,18 @@ import org.apache.spark.annotation.DeveloperApi
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
*
* The map can support up to `536870912 (2 ^ 29)` elements.
*
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
@DeveloperApi
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
extends Iterable[(K, V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
import AppendOnlyMap._
require(initialCapacity <= MAXIMUM_CAPACITY,
s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
private val LOAD_FACTOR = 0.7
......@@ -193,8 +199,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/** Increase table size by 1, rehashing if necessary */
private def incrementSize() {
if (curSize == MAXIMUM_CAPACITY) {
throw new IllegalStateException(s"Can't put more that ${MAXIMUM_CAPACITY} elements")
}
curSize += 1
if (curSize > growThreshold) {
if (curSize > growThreshold && capacity < MAXIMUM_CAPACITY) {
growTable()
}
}
......@@ -206,12 +215,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/** Double the table's size and re-hash everything */
protected def growTable() {
val newCapacity = capacity * 2
if (newCapacity >= (1 << 30)) {
// We can't make the table this big because we want an array of 2x
// that size for our data, but array sizes are at most Int.MaxValue
throw new Exception("Can't make capacity bigger than 2^29 elements")
}
// capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
val newCapacity = (capacity * 2).min(MAXIMUM_CAPACITY)
val newData = new Array[AnyRef](2 * newCapacity)
val newMask = newCapacity - 1
// Insert all our old values into the new array. Note that because our old keys are
......@@ -292,3 +297,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
*/
def atGrowThreshold: Boolean = curSize == growThreshold
}
private object AppendOnlyMap {
val MAXIMUM_CAPACITY = (1 << 29)
}
......@@ -25,11 +25,16 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
/**
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
* The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
{
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
import PartitionedPairBuffer._
require(initialCapacity <= MAXIMUM_CAPACITY,
s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
// Basic growable array data structure. We use a single array of AnyRef to hold both the keys
......@@ -51,11 +56,15 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
/** Double the size of the array because we've reached capacity */
private def growArray(): Unit = {
if (capacity == (1 << 29)) {
// Doubling the capacity would create an array bigger than Int.MaxValue, so don't
throw new Exception("Can't grow buffer beyond 2^29 elements")
if (capacity >= MAXIMUM_CAPACITY) {
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity = capacity * 2
val newCapacity =
if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
}
val newArray = new Array[AnyRef](2 * newCapacity)
System.arraycopy(data, 0, newArray, 0, 2 * capacity)
data = newArray
......@@ -86,3 +95,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}
}
}
private object PartitionedPairBuffer {
val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
}
......@@ -48,6 +48,8 @@ import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._
* | keyStart | keyValLen | partitionId |
* +-------------+------------+------------+-------------+
*
* The buffer can support up to `536870911 (2 ^ 29 - 1)` records.
*
* @param metaInitialRecords The initial number of entries in the metadata buffer.
* @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records.
* @param serializerInstance the serializer used for serializing inserted records.
......@@ -63,6 +65,8 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
" Java-serialized objects.")
}
require(metaInitialRecords <= MAXIMUM_RECORDS,
s"Can't make capacity bigger than ${MAXIMUM_RECORDS} records")
private var metaBuffer = IntBuffer.allocate(metaInitialRecords * RECORD_SIZE)
private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize)
......@@ -89,11 +93,17 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
/** Double the size of the array because we've reached capacity */
private def growMetaBuffer(): Unit = {
if (metaBuffer.capacity.toLong * 2 > Int.MaxValue) {
// Doubling the capacity would create an array bigger than Int.MaxValue, so don't
throw new Exception(s"Can't grow buffer beyond ${Int.MaxValue} bytes")
if (metaBuffer.capacity >= MAXIMUM_META_BUFFER_CAPACITY) {
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_RECORDS} records")
}
val newMetaBuffer = IntBuffer.allocate(metaBuffer.capacity * 2)
val newCapacity =
if (metaBuffer.capacity * 2 < 0 || metaBuffer.capacity * 2 > MAXIMUM_META_BUFFER_CAPACITY) {
// Overflow
MAXIMUM_META_BUFFER_CAPACITY
} else {
metaBuffer.capacity * 2
}
val newMetaBuffer = IntBuffer.allocate(newCapacity)
newMetaBuffer.put(metaBuffer.array)
metaBuffer = newMetaBuffer
}
......@@ -247,12 +257,15 @@ private[spark] class SerializedSortDataFormat extends SortDataFormat[Int, IntBuf
}
}
private[spark] object PartitionedSerializedPairBuffer {
private object PartitionedSerializedPairBuffer {
val KEY_START = 0 // keyStart, a long, gets split across two ints
val KEY_VAL_LEN = 2
val PARTITION = 3
val RECORD_SIZE = PARTITION + 1 // num ints of metadata
val MAXIMUM_RECORDS = Int.MaxValue / RECORD_SIZE // (2 ^ 29) - 1
val MAXIMUM_META_BUFFER_CAPACITY = MAXIMUM_RECORDS * RECORD_SIZE // (2 ^ 31) - 4
def getKeyStartPos(metaBuffer: IntBuffer, metaBufferPos: Int): Long = {
val lower32 = metaBuffer.get(metaBufferPos + KEY_START)
val upper32 = metaBuffer.get(metaBufferPos + KEY_START + 1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment