diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 12fa109cec8239ccdc734e0d6d44b654ffc298ce..e988c0722bd72018677c0d0a31887469ab3b7798 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -177,7 +177,7 @@ public final class OffHeapColumnVector extends ColumnVector { @Override public void putShorts(int rowId, int count, short value) { long offset = data + 2 * rowId; - for (int i = 0; i < count; ++i, offset += 4) { + for (int i = 0; i < count; ++i, offset += 2) { Platform.putShort(null, offset, value); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index e3943f31a48ba3318c912e083aac899a076ad758..8184d7d909f4b69961945313c8be2c1913893b70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -119,6 +119,69 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("Short Apis") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Short] + + val column = ColumnVector.allocate(1024, ShortType, memMode) + var idx = 0 + + val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray + column.putShorts(idx, 2, values, 0) + reference += 1 + reference += 2 + idx += 2 + + column.putShorts(idx, 3, values, 2) + reference += 3 + reference += 4 + reference += 5 + idx += 3 + + column.putShort(idx, 9) + reference += 9 + idx += 1 + + column.putShorts(idx, 3, 4) + reference += 4 + reference += 4 + reference += 4 + idx += 3 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextInt().toShort + column.putShort(idx, v) + reference += v + idx += 1 + } else { + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + val v = (n + 1).toShort + column.putShorts(idx, n, v) + var i = 0 + while (i < n) { + reference += v + i += 1 + } + idx += n + } + } + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getShort(v._2), "Seed = " + seed + " Mem Mode=" + memMode) + if (memMode == MemoryMode.OFF_HEAP) { + val addr = column.valuesNativeAddress() + assert(v._1 == Platform.getShort(null, addr + 2 * v._2)) + } + } + + column.close + }} + } + test("Int Apis") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val seed = System.currentTimeMillis()