diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index d5daaf99df3ab082c2938086d6ad0acfed8e2426..0b276e6c773dabe2bf8f7dc362451110ec6d9b49 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -56,7 +56,7 @@ import org.apache.spark.unsafe.types.UTF8String; * * ColumnVectors are intended to be reused. */ -public abstract class ColumnVector { +public abstract class ColumnVector implements AutoCloseable { /** * Allocates a column to store elements of `type` on or off heap. * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 708a00953abde6e0de129730647e6af61bf256e2..e97276800daa8b708a652b69067f834bd53dbf29 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -387,35 +387,49 @@ public final class OnHeapColumnVector extends ColumnVector { arrayLengths = newLengths; arrayOffsets = newOffsets; } else if (type instanceof BooleanType) { - byte[] newData = new byte[newCapacity]; - if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); - byteData = newData; + if (byteData == null || byteData.length < newCapacity) { + byte[] newData = new byte[newCapacity]; + if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); + byteData = newData; + } } else if (type instanceof ByteType) { - byte[] newData = new byte[newCapacity]; - if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); - byteData = newData; + if (byteData == null || byteData.length < newCapacity) { + byte[] newData = new byte[newCapacity]; + if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); + byteData = newData; + } } else if (type instanceof ShortType) { - short[] newData = new short[newCapacity]; - if (shortData != null) System.arraycopy(shortData, 0, newData, 0, elementsAppended); - shortData = newData; + if (shortData == null || shortData.length < newCapacity) { + short[] newData = new short[newCapacity]; + if (shortData != null) System.arraycopy(shortData, 0, newData, 0, elementsAppended); + shortData = newData; + } } else if (type instanceof IntegerType || type instanceof DateType || DecimalType.is32BitDecimalType(type)) { - int[] newData = new int[newCapacity]; - if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended); - intData = newData; + if (intData == null || intData.length < newCapacity) { + int[] newData = new int[newCapacity]; + if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended); + intData = newData; + } } else if (type instanceof LongType || type instanceof TimestampType || DecimalType.is64BitDecimalType(type)) { - long[] newData = new long[newCapacity]; - if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended); - longData = newData; + if (longData == null || longData.length < newCapacity) { + long[] newData = new long[newCapacity]; + if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended); + longData = newData; + } } else if (type instanceof FloatType) { - float[] newData = new float[newCapacity]; - if (floatData != null) System.arraycopy(floatData, 0, newData, 0, elementsAppended); - floatData = newData; + if (floatData == null || floatData.length < newCapacity) { + float[] newData = new float[newCapacity]; + if (floatData != null) System.arraycopy(floatData, 0, newData, 0, elementsAppended); + floatData = newData; + } } else if (type instanceof DoubleType) { - double[] newData = new double[newCapacity]; - if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended); - doubleData = newData; + if (doubleData == null || doubleData.length < newCapacity) { + double[] newData = new double[newCapacity]; + if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended); + doubleData = newData; + } } else if (resultStruct != null) { // Nothing to store. } else {