Skip to content
Snippets Groups Projects
Commit c09b31eb authored by Kazuaki Ishizaki's avatar Kazuaki Ishizaki Committed by Wenchen Fan
Browse files

[SPARK-21217][SQL] Support ColumnVector.Array.to<type>Array()

## What changes were proposed in this pull request?

This PR implements bulk-copy for `ColumnVector.Array.to<type>Array()` methods (e.g. `toIntArray()`) in `ColumnVector.Array` by using `System.arrayCopy()` or `Platform.copyMemory()`.

Before this PR, when one of these method is called, the generic method in `ArrayData` is called. It is not fast since element-wise copy is performed.

This PR can improve performance of a benchmark program by 1.9x and 3.2x.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz

Int Array                                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
------------------------------------------------------------------------------------------------
ON_HEAP                                        586 /  628         14.3          69.9
OFF_HEAP                                       893 /  902          9.4         106.5
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_131-8u131-b11-0ubuntu1.16.04.2-b11 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz

Int Array                                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)
------------------------------------------------------------------------------------------------
ON_HEAP                                        306 /  331         27.4          36.4
OFF_HEAP                                       282 /  287         29.8          33.6
```

Source program
```
    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
      val len = 8 * 1024 * 1024
      val column = ColumnVector.allocate(len * 2, new ArrayType(IntegerType, false), memMode)

      val data = column.arrayData
      var i = 0
      while (i < len) {
        data.putInt(i, i)
        i += 1
      }
      column.putArray(0, 0, len)

      val benchmark = new Benchmark("Int Array", len, minNumIters = 20)
      benchmark.addCase(s"$memMode") { iter =>
        var i = 0
        while (i < 50) {
          column.getArray(0).toIntArray
          i += 1
        }
      }
      benchmark.run
    }}
```

## How was this patch tested?

Added test suite

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18425 from kiszk/SPARK-21217.
parent 53c2eb59
No related branches found
No related tags found
No related merge requests found
...@@ -100,6 +100,27 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -100,6 +100,27 @@ public abstract class ColumnVector implements AutoCloseable {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public boolean[] toBooleanArray() { return data.getBooleans(offset, length); }
@Override
public byte[] toByteArray() { return data.getBytes(offset, length); }
@Override
public short[] toShortArray() { return data.getShorts(offset, length); }
@Override
public int[] toIntArray() { return data.getInts(offset, length); }
@Override
public long[] toLongArray() { return data.getLongs(offset, length); }
@Override
public float[] toFloatArray() { return data.getFloats(offset, length); }
@Override
public double[] toDoubleArray() { return data.getDoubles(offset, length); }
// TODO: this is extremely expensive. // TODO: this is extremely expensive.
@Override @Override
public Object[] array() { public Object[] array() {
...@@ -366,6 +387,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -366,6 +387,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract boolean getBoolean(int rowId); public abstract boolean getBoolean(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract boolean[] getBooleans(int rowId, int count);
/** /**
* Sets the value at rowId to `value`. * Sets the value at rowId to `value`.
*/ */
...@@ -386,6 +412,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -386,6 +412,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract byte getByte(int rowId); public abstract byte getByte(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract byte[] getBytes(int rowId, int count);
/** /**
* Sets the value at rowId to `value`. * Sets the value at rowId to `value`.
*/ */
...@@ -406,6 +437,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -406,6 +437,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract short getShort(int rowId); public abstract short getShort(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract short[] getShorts(int rowId, int count);
/** /**
* Sets the value at rowId to `value`. * Sets the value at rowId to `value`.
*/ */
...@@ -432,6 +468,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -432,6 +468,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract int getInt(int rowId); public abstract int getInt(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract int[] getInts(int rowId, int count);
/** /**
* Returns the dictionary Id for rowId. * Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds. * This should only be called when the ColumnVector is dictionaryIds.
...@@ -465,6 +506,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -465,6 +506,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract long getLong(int rowId); public abstract long getLong(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract long[] getLongs(int rowId, int count);
/** /**
* Sets the value at rowId to `value`. * Sets the value at rowId to `value`.
*/ */
...@@ -491,6 +537,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -491,6 +537,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract float getFloat(int rowId); public abstract float getFloat(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract float[] getFloats(int rowId, int count);
/** /**
* Sets the value at rowId to `value`. * Sets the value at rowId to `value`.
*/ */
...@@ -517,6 +568,11 @@ public abstract class ColumnVector implements AutoCloseable { ...@@ -517,6 +568,11 @@ public abstract class ColumnVector implements AutoCloseable {
*/ */
public abstract double getDouble(int rowId); public abstract double getDouble(int rowId);
/**
* Gets values from [rowId, rowId + count)
*/
public abstract double[] getDoubles(int rowId, int count);
/** /**
* Puts a byte array that already exists in this column. * Puts a byte array that already exists in this column.
*/ */
......
...@@ -134,6 +134,16 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -134,6 +134,16 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override @Override
public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; } public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }
@Override
public boolean[] getBooleans(int rowId, int count) {
assert(dictionary == null);
boolean[] array = new boolean[count];
for (int i = 0; i < count; ++i) {
array[i] = (Platform.getByte(null, data + rowId + i) == 1);
}
return array;
}
// //
// APIs dealing with Bytes // APIs dealing with Bytes
// //
...@@ -165,6 +175,14 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -165,6 +175,14 @@ public final class OffHeapColumnVector extends ColumnVector {
} }
} }
@Override
public byte[] getBytes(int rowId, int count) {
assert(dictionary == null);
byte[] array = new byte[count];
Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
return array;
}
// //
// APIs dealing with shorts // APIs dealing with shorts
// //
...@@ -197,6 +215,14 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -197,6 +215,14 @@ public final class OffHeapColumnVector extends ColumnVector {
} }
} }
@Override
public short[] getShorts(int rowId, int count) {
assert(dictionary == null);
short[] array = new short[count];
Platform.copyMemory(null, data + rowId * 2, array, Platform.SHORT_ARRAY_OFFSET, count * 2);
return array;
}
// //
// APIs dealing with ints // APIs dealing with ints
// //
...@@ -244,6 +270,14 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -244,6 +270,14 @@ public final class OffHeapColumnVector extends ColumnVector {
} }
} }
@Override
public int[] getInts(int rowId, int count) {
assert(dictionary == null);
int[] array = new int[count];
Platform.copyMemory(null, data + rowId * 4, array, Platform.INT_ARRAY_OFFSET, count * 4);
return array;
}
/** /**
* Returns the dictionary Id for rowId. * Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds. * This should only be called when the ColumnVector is dictionaryIds.
...@@ -302,6 +336,14 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -302,6 +336,14 @@ public final class OffHeapColumnVector extends ColumnVector {
} }
} }
@Override
public long[] getLongs(int rowId, int count) {
assert(dictionary == null);
long[] array = new long[count];
Platform.copyMemory(null, data + rowId * 8, array, Platform.LONG_ARRAY_OFFSET, count * 8);
return array;
}
// //
// APIs dealing with floats // APIs dealing with floats
// //
...@@ -348,6 +390,14 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -348,6 +390,14 @@ public final class OffHeapColumnVector extends ColumnVector {
} }
} }
@Override
public float[] getFloats(int rowId, int count) {
assert(dictionary == null);
float[] array = new float[count];
Platform.copyMemory(null, data + rowId * 4, array, Platform.FLOAT_ARRAY_OFFSET, count * 4);
return array;
}
// //
// APIs dealing with doubles // APIs dealing with doubles
...@@ -395,6 +445,14 @@ public final class OffHeapColumnVector extends ColumnVector { ...@@ -395,6 +445,14 @@ public final class OffHeapColumnVector extends ColumnVector {
} }
} }
@Override
public double[] getDoubles(int rowId, int count) {
assert(dictionary == null);
double[] array = new double[count];
Platform.copyMemory(null, data + rowId * 8, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8);
return array;
}
// //
// APIs dealing with Arrays. // APIs dealing with Arrays.
// //
......
...@@ -130,6 +130,16 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -130,6 +130,16 @@ public final class OnHeapColumnVector extends ColumnVector {
return byteData[rowId] == 1; return byteData[rowId] == 1;
} }
@Override
public boolean[] getBooleans(int rowId, int count) {
assert(dictionary == null);
boolean[] array = new boolean[count];
for (int i = 0; i < count; ++i) {
array[i] = (byteData[rowId + i] == 1);
}
return array;
}
// //
// //
...@@ -162,6 +172,14 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -162,6 +172,14 @@ public final class OnHeapColumnVector extends ColumnVector {
} }
} }
@Override
public byte[] getBytes(int rowId, int count) {
assert(dictionary == null);
byte[] array = new byte[count];
System.arraycopy(byteData, rowId, array, 0, count);
return array;
}
// //
// APIs dealing with Shorts // APIs dealing with Shorts
// //
...@@ -192,6 +210,14 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -192,6 +210,14 @@ public final class OnHeapColumnVector extends ColumnVector {
} }
} }
@Override
public short[] getShorts(int rowId, int count) {
assert(dictionary == null);
short[] array = new short[count];
System.arraycopy(shortData, rowId, array, 0, count);
return array;
}
// //
// APIs dealing with Ints // APIs dealing with Ints
...@@ -234,6 +260,14 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -234,6 +260,14 @@ public final class OnHeapColumnVector extends ColumnVector {
} }
} }
@Override
public int[] getInts(int rowId, int count) {
assert(dictionary == null);
int[] array = new int[count];
System.arraycopy(intData, rowId, array, 0, count);
return array;
}
/** /**
* Returns the dictionary Id for rowId. * Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds. * This should only be called when the ColumnVector is dictionaryIds.
...@@ -286,6 +320,14 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -286,6 +320,14 @@ public final class OnHeapColumnVector extends ColumnVector {
} }
} }
@Override
public long[] getLongs(int rowId, int count) {
assert(dictionary == null);
long[] array = new long[count];
System.arraycopy(longData, rowId, array, 0, count);
return array;
}
// //
// APIs dealing with floats // APIs dealing with floats
// //
...@@ -325,6 +367,14 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -325,6 +367,14 @@ public final class OnHeapColumnVector extends ColumnVector {
} }
} }
@Override
public float[] getFloats(int rowId, int count) {
assert(dictionary == null);
float[] array = new float[count];
System.arraycopy(floatData, rowId, array, 0, count);
return array;
}
// //
// APIs dealing with doubles // APIs dealing with doubles
// //
...@@ -366,6 +416,14 @@ public final class OnHeapColumnVector extends ColumnVector { ...@@ -366,6 +416,14 @@ public final class OnHeapColumnVector extends ColumnVector {
} }
} }
@Override
public double[] getDoubles(int rowId, int count) {
assert(dictionary == null);
double[] array = new double[count];
System.arraycopy(doubleData, rowId, array, 0, count);
return array;
}
// //
// APIs dealing with Arrays // APIs dealing with Arrays
// //
......
...@@ -709,6 +709,55 @@ class ColumnarBatchSuite extends SparkFunSuite { ...@@ -709,6 +709,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
}} }}
} }
test("toArray for primitive types") {
// (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
(MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
val len = 4
val columnBool = ColumnVector.allocate(len, new ArrayType(BooleanType, false), memMode)
val boolArray = Array(false, true, false, true)
boolArray.zipWithIndex.map { case (v, i) => columnBool.arrayData.putBoolean(i, v) }
columnBool.putArray(0, 0, len)
assert(columnBool.getArray(0).toBooleanArray === boolArray)
val columnByte = ColumnVector.allocate(len, new ArrayType(ByteType, false), memMode)
val byteArray = Array[Byte](0, 1, 2, 3)
byteArray.zipWithIndex.map { case (v, i) => columnByte.arrayData.putByte(i, v) }
columnByte.putArray(0, 0, len)
assert(columnByte.getArray(0).toByteArray === byteArray)
val columnShort = ColumnVector.allocate(len, new ArrayType(ShortType, false), memMode)
val shortArray = Array[Short](0, 1, 2, 3)
shortArray.zipWithIndex.map { case (v, i) => columnShort.arrayData.putShort(i, v) }
columnShort.putArray(0, 0, len)
assert(columnShort.getArray(0).toShortArray === shortArray)
val columnInt = ColumnVector.allocate(len, new ArrayType(IntegerType, false), memMode)
val intArray = Array(0, 1, 2, 3)
intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, v) }
columnInt.putArray(0, 0, len)
assert(columnInt.getArray(0).toIntArray === intArray)
val columnLong = ColumnVector.allocate(len, new ArrayType(LongType, false), memMode)
val longArray = Array[Long](0, 1, 2, 3)
longArray.zipWithIndex.map { case (v, i) => columnLong.arrayData.putLong(i, v) }
columnLong.putArray(0, 0, len)
assert(columnLong.getArray(0).toLongArray === longArray)
val columnFloat = ColumnVector.allocate(len, new ArrayType(FloatType, false), memMode)
val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F)
floatArray.zipWithIndex.map { case (v, i) => columnFloat.arrayData.putFloat(i, v) }
columnFloat.putArray(0, 0, len)
assert(columnFloat.getArray(0).toFloatArray === floatArray)
val columnDouble = ColumnVector.allocate(len, new ArrayType(DoubleType, false), memMode)
val doubleArray = Array(0.0, 1.1, 2.2, 3.3)
doubleArray.zipWithIndex.map { case (v, i) => columnDouble.arrayData.putDouble(i, v) }
columnDouble.putArray(0, 0, len)
assert(columnDouble.getArray(0).toDoubleArray === doubleArray)
}}
}
test("Struct Column") { test("Struct Column") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val schema = new StructType().add("int", IntegerType).add("double", DoubleType) val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment