diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 24260a60197f205f017dd555583c9adbca83be79..0c027f80d48cc9486ce4d394bf1a540e1b914260 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -100,6 +100,27 @@ public abstract class ColumnVector implements AutoCloseable {
       throw new UnsupportedOperationException();
     }
 
+    @Override
+    public boolean[] toBooleanArray() { return data.getBooleans(offset, length); }
+
+    @Override
+    public byte[] toByteArray() { return data.getBytes(offset, length); }
+
+    @Override
+    public short[] toShortArray() { return data.getShorts(offset, length); }
+
+    @Override
+    public int[] toIntArray() { return data.getInts(offset, length); }
+
+    @Override
+    public long[] toLongArray() { return data.getLongs(offset, length); }
+
+    @Override
+    public float[] toFloatArray() { return data.getFloats(offset, length); }
+
+    @Override
+    public double[] toDoubleArray() { return data.getDoubles(offset, length); }
+
     // TODO: this is extremely expensive.
     @Override
     public Object[] array() {
@@ -366,6 +387,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract boolean getBoolean(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract boolean[] getBooleans(int rowId, int count);
+
   /**
    * Sets the value at rowId to `value`.
    */
@@ -386,6 +412,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract byte getByte(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract byte[] getBytes(int rowId, int count);
+
   /**
    * Sets the value at rowId to `value`.
    */
@@ -406,6 +437,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract short getShort(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract short[] getShorts(int rowId, int count);
+
   /**
    * Sets the value at rowId to `value`.
    */
@@ -432,6 +468,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract int getInt(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract int[] getInts(int rowId, int count);
+
   /**
    * Returns the dictionary Id for rowId.
    * This should only be called when the ColumnVector is dictionaryIds.
@@ -465,6 +506,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract long getLong(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract long[] getLongs(int rowId, int count);
+
   /**
    * Sets the value at rowId to `value`.
    */
@@ -491,6 +537,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract float getFloat(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract float[] getFloats(int rowId, int count);
+
   /**
    * Sets the value at rowId to `value`.
    */
@@ -517,6 +568,11 @@ public abstract class ColumnVector implements AutoCloseable {
    */
   public abstract double getDouble(int rowId);
 
+  /**
+   * Gets values from [rowId, rowId + count)
+   */
+  public abstract double[] getDoubles(int rowId, int count);
+
   /**
    * Puts a byte array that already exists in this column.
    */
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index a7d3744d00e91de3c412b0521563695630a22f92..2d1f3da8e74637c8b3cc97ca240f3ba584872796 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -134,6 +134,16 @@ public final class OffHeapColumnVector extends ColumnVector {
   @Override
   public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }
 
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+    assert(dictionary == null);
+    boolean[] array = new boolean[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = (Platform.getByte(null, data + rowId + i) == 1);
+    }
+    return array;
+  }
+
   //
   // APIs dealing with Bytes
   //
@@ -165,6 +175,14 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public byte[] getBytes(int rowId, int count) {
+    assert(dictionary == null);
+    byte[] array = new byte[count];
+    Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
+    return array;
+  }
+
   //
   // APIs dealing with shorts
   //
@@ -197,6 +215,14 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public short[] getShorts(int rowId, int count) {
+    assert(dictionary == null);
+    short[] array = new short[count];
+    Platform.copyMemory(null, data + rowId * 2, array, Platform.SHORT_ARRAY_OFFSET, count * 2);
+    return array;
+  }
+
   //
   // APIs dealing with ints
   //
@@ -244,6 +270,14 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public int[] getInts(int rowId, int count) {
+    assert(dictionary == null);
+    int[] array = new int[count];
+    Platform.copyMemory(null, data + rowId * 4, array, Platform.INT_ARRAY_OFFSET, count * 4);
+    return array;
+  }
+
   /**
    * Returns the dictionary Id for rowId.
    * This should only be called when the ColumnVector is dictionaryIds.
@@ -302,6 +336,14 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public long[] getLongs(int rowId, int count) {
+    assert(dictionary == null);
+    long[] array = new long[count];
+    Platform.copyMemory(null, data + rowId * 8, array, Platform.LONG_ARRAY_OFFSET, count * 8);
+    return array;
+  }
+
   //
   // APIs dealing with floats
   //
@@ -348,6 +390,14 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public float[] getFloats(int rowId, int count) {
+    assert(dictionary == null);
+    float[] array = new float[count];
+    Platform.copyMemory(null, data + rowId * 4, array, Platform.FLOAT_ARRAY_OFFSET, count * 4);
+    return array;
+  }
+
 
   //
   // APIs dealing with doubles
@@ -395,6 +445,14 @@ public final class OffHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public double[] getDoubles(int rowId, int count) {
+    assert(dictionary == null);
+    double[] array = new double[count];
+    Platform.copyMemory(null, data + rowId * 8, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8);
+    return array;
+  }
+
   //
   // APIs dealing with Arrays.
   //
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 94ed32294cfaeef2ccd643f7688a5710d7a5a6ee..506434364be487faf33ef4d3eb1d45928a1f2d73 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -130,6 +130,16 @@ public final class OnHeapColumnVector extends ColumnVector {
     return byteData[rowId] == 1;
   }
 
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+    assert(dictionary == null);
+    boolean[] array = new boolean[count];
+    for (int i = 0; i < count; ++i) {
+      array[i] = (byteData[rowId + i] == 1);
+    }
+   return array;
+  }
+
   //
 
   //
@@ -162,6 +172,14 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public byte[] getBytes(int rowId, int count) {
+    assert(dictionary == null);
+    byte[] array = new byte[count];
+    System.arraycopy(byteData, rowId, array, 0, count);
+    return array;
+  }
+
   //
   // APIs dealing with Shorts
   //
@@ -192,6 +210,14 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public short[] getShorts(int rowId, int count) {
+    assert(dictionary == null);
+    short[] array = new short[count];
+    System.arraycopy(shortData, rowId, array, 0, count);
+    return array;
+  }
+
 
   //
   // APIs dealing with Ints
@@ -234,6 +260,14 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public int[] getInts(int rowId, int count) {
+    assert(dictionary == null);
+    int[] array = new int[count];
+    System.arraycopy(intData, rowId, array, 0, count);
+    return array;
+  }
+
   /**
    * Returns the dictionary Id for rowId.
    * This should only be called when the ColumnVector is dictionaryIds.
@@ -286,6 +320,14 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public long[] getLongs(int rowId, int count) {
+    assert(dictionary == null);
+    long[] array = new long[count];
+    System.arraycopy(longData, rowId, array, 0, count);
+    return array;
+  }
+
   //
   // APIs dealing with floats
   //
@@ -325,6 +367,14 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public float[] getFloats(int rowId, int count) {
+    assert(dictionary == null);
+    float[] array = new float[count];
+    System.arraycopy(floatData, rowId, array, 0, count);
+    return array;
+  }
+
   //
   // APIs dealing with doubles
   //
@@ -366,6 +416,14 @@ public final class OnHeapColumnVector extends ColumnVector {
     }
   }
 
+  @Override
+  public double[] getDoubles(int rowId, int count) {
+    assert(dictionary == null);
+    double[] array = new double[count];
+    System.arraycopy(doubleData, rowId, array, 0, count);
+    return array;
+  }
+
   //
   // APIs dealing with Arrays
   //
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 80d41577dcf2d2a02ebcab849bb171a79202d8d1..ccf7aa7022a2a73ecd95f698aa7e42108a56ece5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -709,6 +709,55 @@ class ColumnarBatchSuite extends SparkFunSuite {
     }}
   }
 
+  test("toArray for primitive types") {
+    // (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+    (MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
+      val len = 4
+
+      val columnBool = ColumnVector.allocate(len, new ArrayType(BooleanType, false), memMode)
+      val boolArray = Array(false, true, false, true)
+      boolArray.zipWithIndex.map { case (v, i) => columnBool.arrayData.putBoolean(i, v) }
+      columnBool.putArray(0, 0, len)
+      assert(columnBool.getArray(0).toBooleanArray === boolArray)
+
+      val columnByte = ColumnVector.allocate(len, new ArrayType(ByteType, false), memMode)
+      val byteArray = Array[Byte](0, 1, 2, 3)
+      byteArray.zipWithIndex.map { case (v, i) => columnByte.arrayData.putByte(i, v) }
+      columnByte.putArray(0, 0, len)
+      assert(columnByte.getArray(0).toByteArray === byteArray)
+
+      val columnShort = ColumnVector.allocate(len, new ArrayType(ShortType, false), memMode)
+      val shortArray = Array[Short](0, 1, 2, 3)
+      shortArray.zipWithIndex.map { case (v, i) => columnShort.arrayData.putShort(i, v) }
+      columnShort.putArray(0, 0, len)
+      assert(columnShort.getArray(0).toShortArray === shortArray)
+
+      val columnInt = ColumnVector.allocate(len, new ArrayType(IntegerType, false), memMode)
+      val intArray = Array(0, 1, 2, 3)
+      intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, v) }
+      columnInt.putArray(0, 0, len)
+      assert(columnInt.getArray(0).toIntArray === intArray)
+
+      val columnLong = ColumnVector.allocate(len, new ArrayType(LongType, false), memMode)
+      val longArray = Array[Long](0, 1, 2, 3)
+      longArray.zipWithIndex.map { case (v, i) => columnLong.arrayData.putLong(i, v) }
+      columnLong.putArray(0, 0, len)
+      assert(columnLong.getArray(0).toLongArray === longArray)
+
+      val columnFloat = ColumnVector.allocate(len, new ArrayType(FloatType, false), memMode)
+      val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F)
+      floatArray.zipWithIndex.map { case (v, i) => columnFloat.arrayData.putFloat(i, v) }
+      columnFloat.putArray(0, 0, len)
+      assert(columnFloat.getArray(0).toFloatArray === floatArray)
+
+      val columnDouble = ColumnVector.allocate(len, new ArrayType(DoubleType, false), memMode)
+      val doubleArray = Array(0.0, 1.1, 2.2, 3.3)
+      doubleArray.zipWithIndex.map { case (v, i) => columnDouble.arrayData.putDouble(i, v) }
+      columnDouble.putArray(0, 0, len)
+      assert(columnDouble.getArray(0).toDoubleArray === doubleArray)
+    }}
+  }
+
   test("Struct Column") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val schema = new StructType().add("int", IntegerType).add("double", DoubleType)