From bfc8c79c8dda7668cfded2a728424853a26da035 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun <dongjoon@apache.org> Date: Thu, 4 May 2017 21:04:15 +0800 Subject: [PATCH] [SPARK-20566][SQL] ColumnVector should support `appendFloats` for array ## What changes were proposed in this pull request? This PR aims to add a missing `appendFloats` API for array into **ColumnVector** class. For double type, there is `appendDoubles` for array [here](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java#L818-L824). ## How was this patch tested? Pass the Jenkins with a newly added test case. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #17836 from dongjoon-hyun/SPARK-20566. --- .../execution/vectorized/ColumnVector.java | 8 + .../vectorized/ColumnarBatchSuite.scala | 256 ++++++++++++++++-- 2 files changed, 240 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index b105e60a2d..ad267ab0c9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -801,6 +801,14 @@ public abstract class ColumnVector implements AutoCloseable { return result; } + public final int appendFloats(int length, float[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putFloats(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + public final int appendDouble(double v) { reserve(elementsAppended + 1); putDouble(elementsAppended, v); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 8184d7d909..e48e3f6402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -41,24 +41,49 @@ class ColumnarBatchSuite extends SparkFunSuite { val column = ColumnVector.allocate(1024, IntegerType, memMode) var idx = 0 assert(column.anyNullsSet() == false) + assert(column.numNulls() == 0) + + column.appendNotNull() + reference += false + assert(column.anyNullsSet() == false) + assert(column.numNulls() == 0) + + column.appendNotNulls(3) + (1 to 3).foreach(_ => reference += false) + assert(column.anyNullsSet() == false) + assert(column.numNulls() == 0) + + column.appendNull() + reference += true + assert(column.anyNullsSet()) + assert(column.numNulls() == 1) + + column.appendNulls(3) + (1 to 3).foreach(_ => reference += true) + assert(column.anyNullsSet()) + assert(column.numNulls() == 4) + + idx = column.elementsAppended column.putNotNull(idx) reference += false idx += 1 - assert(column.anyNullsSet() == false) + assert(column.anyNullsSet()) + assert(column.numNulls() == 4) column.putNull(idx) reference += true idx += 1 - assert(column.anyNullsSet() == true) - assert(column.numNulls() == 1) + assert(column.anyNullsSet()) + assert(column.numNulls() == 5) column.putNulls(idx, 3) reference += true reference += true reference += true idx += 3 - assert(column.anyNullsSet() == true) + assert(column.anyNullsSet()) + assert(column.numNulls() == 8) column.putNotNulls(idx, 4) reference += false @@ -66,8 +91,8 @@ class ColumnarBatchSuite extends SparkFunSuite { reference += false reference += false idx += 4 - assert(column.anyNullsSet() == true) - assert(column.numNulls() == 4) + assert(column.anyNullsSet()) + assert(column.numNulls() == 8) reference.zipWithIndex.foreach { v => assert(v._1 == column.isNullAt(v._2)) @@ -85,9 +110,26 @@ class ColumnarBatchSuite extends SparkFunSuite { val reference = mutable.ArrayBuffer.empty[Byte] val column = ColumnVector.allocate(1024, ByteType, memMode) - var idx = 0 - val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray + var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray + column.appendBytes(2, values, 0) + reference += 10.toByte + reference += 20.toByte + + column.appendBytes(3, values, 2) + reference += 30.toByte + reference += 40.toByte + reference += 50.toByte + + column.appendBytes(6, 60.toByte) + (1 to 6).foreach(_ => reference += 60.toByte) + + column.appendByte(70.toByte) + reference += 70.toByte + + var idx = column.elementsAppended + + values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray column.putBytes(idx, 2, values, 0) reference += 1 reference += 2 @@ -126,9 +168,26 @@ class ColumnarBatchSuite extends SparkFunSuite { val reference = mutable.ArrayBuffer.empty[Short] val column = ColumnVector.allocate(1024, ShortType, memMode) - var idx = 0 - val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray + var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray + column.appendShorts(2, values, 0) + reference += 10.toShort + reference += 20.toShort + + column.appendShorts(3, values, 2) + reference += 30.toShort + reference += 40.toShort + reference += 50.toShort + + column.appendShorts(6, 60.toShort) + (1 to 6).foreach(_ => reference += 60.toShort) + + column.appendShort(70.toShort) + reference += 70.toShort + + var idx = column.elementsAppended + + values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toShort).toArray column.putShorts(idx, 2, values, 0) reference += 1 reference += 2 @@ -189,9 +248,26 @@ class ColumnarBatchSuite extends SparkFunSuite { val reference = mutable.ArrayBuffer.empty[Int] val column = ColumnVector.allocate(1024, IntegerType, memMode) - var idx = 0 - val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray + var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray + column.appendInts(2, values, 0) + reference += 10 + reference += 20 + + column.appendInts(3, values, 2) + reference += 30 + reference += 40 + reference += 50 + + column.appendInts(6, 60) + (1 to 6).foreach(_ => reference += 60) + + column.appendInt(70) + reference += 70 + + var idx = column.elementsAppended + + values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray column.putInts(idx, 2, values, 0) reference += 1 reference += 2 @@ -257,9 +333,26 @@ class ColumnarBatchSuite extends SparkFunSuite { val reference = mutable.ArrayBuffer.empty[Long] val column = ColumnVector.allocate(1024, LongType, memMode) - var idx = 0 - val values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray + var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray + column.appendLongs(2, values, 0) + reference += 10L + reference += 20L + + column.appendLongs(3, values, 2) + reference += 30L + reference += 40L + reference += 50L + + column.appendLongs(6, 60L) + (1 to 6).foreach(_ => reference += 60L) + + column.appendLong(70L) + reference += 70L + + var idx = column.elementsAppended + + values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray column.putLongs(idx, 2, values, 0) reference += 1 reference += 2 @@ -320,6 +413,97 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("Float APIs") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Float] + + val column = ColumnVector.allocate(1024, FloatType, memMode) + + var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray + column.appendFloats(2, values, 0) + reference += .1f + reference += .2f + + column.appendFloats(3, values, 2) + reference += .3f + reference += .4f + reference += .5f + + column.appendFloats(6, .6f) + (1 to 6).foreach(_ => reference += .6f) + + column.appendFloat(.7f) + reference += .7f + + var idx = column.elementsAppended + + values = (1.0f :: 2.0f :: 3.0f :: 4.0f :: 5.0f :: Nil).toArray + column.putFloats(idx, 2, values, 0) + reference += 1.0f + reference += 2.0f + idx += 2 + + column.putFloats(idx, 3, values, 2) + reference += 3.0f + reference += 4.0f + reference += 5.0f + idx += 3 + + val buffer = new Array[Byte](8) + Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f) + Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f) + + if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { + // Ensure array contains Little Endian floats + val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) + Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0)) + Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, bb.getFloat(4)) + } + + column.putFloats(idx, 1, buffer, 4) + column.putFloats(idx + 1, 1, buffer, 0) + reference += 1.123f + reference += 2.234f + idx += 2 + + column.putFloats(idx, 2, buffer, 0) + reference += 2.234f + reference += 1.123f + idx += 2 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextFloat() + column.putFloat(idx, v) + reference += v + idx += 1 + } else { + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + val v = random.nextFloat() + column.putFloats(idx, n, v) + var i = 0 + while (i < n) { + reference += v + i += 1 + } + idx += n + } + } + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getFloat(v._2), "Seed = " + seed + " MemMode=" + memMode) + if (memMode == MemoryMode.OFF_HEAP) { + val addr = column.valuesNativeAddress() + assert(v._1 == Platform.getFloat(null, addr + 4 * v._2)) + } + } + column.close + }} + } + test("Double APIs") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val seed = System.currentTimeMillis() @@ -327,9 +511,26 @@ class ColumnarBatchSuite extends SparkFunSuite { val reference = mutable.ArrayBuffer.empty[Double] val column = ColumnVector.allocate(1024, DoubleType, memMode) - var idx = 0 - val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray + var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray + column.appendDoubles(2, values, 0) + reference += .1 + reference += .2 + + column.appendDoubles(3, values, 2) + reference += .3 + reference += .4 + reference += .5 + + column.appendDoubles(6, .6) + (1 to 6).foreach(_ => reference += .6) + + column.appendDouble(.7) + reference += .7 + + var idx = column.elementsAppended + + values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray column.putDoubles(idx, 2, values, 0) reference += 1.0 reference += 2.0 @@ -346,8 +547,8 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123) if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - // Ensure array contains Liitle Endian doubles - var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) + // Ensure array contains Little Endian doubles + val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0)) Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8)) } @@ -400,40 +601,47 @@ class ColumnarBatchSuite extends SparkFunSuite { val column = ColumnVector.allocate(6, BinaryType, memMode) assert(column.arrayData().elementsAppended == 0) - var idx = 0 + + val str = "string" + column.appendByteArray(str.getBytes(StandardCharsets.UTF_8), + 0, str.getBytes(StandardCharsets.UTF_8).length) + reference += str + assert(column.arrayData().elementsAppended == 6) + + var idx = column.elementsAppended val values = ("Hello" :: "abc" :: Nil).toArray column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), 0, values(0).getBytes(StandardCharsets.UTF_8).length) reference += values(0) idx += 1 - assert(column.arrayData().elementsAppended == 5) + assert(column.arrayData().elementsAppended == 11) column.putByteArray(idx, values(1).getBytes(StandardCharsets.UTF_8), 0, values(1).getBytes(StandardCharsets.UTF_8).length) reference += values(1) idx += 1 - assert(column.arrayData().elementsAppended == 8) + assert(column.arrayData().elementsAppended == 14) // Just put llo val offset = column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), 2, values(0).getBytes(StandardCharsets.UTF_8).length - 2) reference += "llo" idx += 1 - assert(column.arrayData().elementsAppended == 11) + assert(column.arrayData().elementsAppended == 17) // Put the same "ll" at offset. This should not allocate more memory in the column. column.putArray(idx, offset, 2) reference += "ll" idx += 1 - assert(column.arrayData().elementsAppended == 11) + assert(column.arrayData().elementsAppended == 17) // Put a long string val s = "abcdefghijklmnopqrstuvwxyz" column.putByteArray(idx, (s + s).getBytes(StandardCharsets.UTF_8)) reference += (s + s) idx += 1 - assert(column.arrayData().elementsAppended == 11 + (s + s).length) + assert(column.arrayData().elementsAppended == 17 + (s + s).length) reference.zipWithIndex.foreach { v => assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode) -- GitLab