Skip to content
Snippets Groups Projects
Commit afb21bf2 authored by Kazuaki Ishizaki's avatar Kazuaki Ishizaki Committed by Wenchen Fan
Browse files

[SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation

## What changes were proposed in this pull request?

As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage.

`OffHeapColumnVector` reallocation also copies to the new storage data up to 'elementsAppended'. This variable is only updated when using the `ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used.
This PR copies the new storage data up to the previously-allocated size in`OffHeapColumnVector`.

## How was this patch tested?

Existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17811 from kiszk/SPARK-20537.
parent 90d77e97
No related branches found
No related tags found
No related merge requests found
......@@ -436,28 +436,29 @@ public final class OffHeapColumnVector extends ColumnVector {
// Split out the slow path.
@Override
protected void reserveInternal(int newCapacity) {
int oldCapacity = (this.data == 0L) ? 0 : capacity;
if (this.resultArray != null) {
this.lengthData =
Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4);
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
this.offsetData =
Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4);
Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
} else if (type instanceof ByteType || type instanceof BooleanType) {
this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity);
this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
} else if (type instanceof ShortType) {
this.data = Platform.reallocateMemory(data, elementsAppended * 2, newCapacity * 2);
this.data = Platform.reallocateMemory(data, oldCapacity * 2, newCapacity * 2);
} else if (type instanceof IntegerType || type instanceof FloatType ||
type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
this.data = Platform.reallocateMemory(data, oldCapacity * 4, newCapacity * 4);
} else if (type instanceof LongType || type instanceof DoubleType ||
DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
} else if (resultStruct != null) {
// Nothing to store.
} else {
throw new RuntimeException("Unhandled " + type);
}
this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity);
Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended);
this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity);
capacity = newCapacity;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment