diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 457a1a05a1bf581e4c01e54a9893a0bb4c7f628f..d484cec7ae38406cabc97a1050c06803a14b729f 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -62,10 +62,10 @@ private[spark] class Benchmark(
     val firstRate = results.head.avgRate
     // The results are going to be processor specific so it is useful to include that.
     println(Benchmark.getProcessorName())
-    printf("%-24s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
-    println("-------------------------------------------------------------------------")
+    printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
+    println("-------------------------------------------------------------------------------")
     results.zip(benchmarks).foreach { r =>
-      printf("%-24s %16s %16s %14s\n",
+      printf("%-30s %16s %16s %14s\n",
         r._2.name,
         "%10.2f" format r._1.avgMs,
         "%10.2f" format r._1.avgRate,
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 47818c0939f2ae336237b67f0a7625d821d5428b..80805f15a8f06861515806e641aa8167ed2e398e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
@@ -35,9 +35,12 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -102,6 +105,25 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
    */
   private static final int DEFAULT_VAR_LEN_SIZE = 32;
 
+  /**
+   * columnBatch object that is used for batch decoding. This is created on first use and triggers
+   * batched decoding. It is not valid to interleave calls to the batched interface with the row
+   * by row RecordReader APIs.
+   * This is only enabled with additional flags for development. This is still a work in progress
+   * and currently unsupported cases will fail with potentially difficult to diagnose errors.
+   * This should be only turned on for development to work on this feature.
+   *
+   * TODOs:
+   *  - Implement all the encodings to support vectorized.
+   *  - Implement v2 page formats (just make sure we create the correct decoders).
+   */
+  private ColumnarBatch columnarBatch;
+
+  /**
+   * The default config on whether columnarBatch should be offheap.
+   */
+  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
   /**
    * Tries to initialize the reader for this split. Returns true if this reader supports reading
    * this split and false otherwise.
@@ -135,6 +157,15 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     initializeInternal();
   }
 
+  @Override
+  public void close() throws IOException {
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    super.close();
+  }
+
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
     if (batchIdx >= numBatched) {
@@ -154,6 +185,46 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     return (float) rowsReturned / totalRowCount;
   }
 
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+  public ColumnarBatch resultBatch() {
+    return resultBatch(DEFAULT_MEMORY_MODE);
+  }
+
+  public ColumnarBatch resultBatch(MemoryMode memMode) {
+    if (columnarBatch == null) {
+      columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
+    }
+    return columnarBatch;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  public boolean nextBatch() throws IOException {
+    assert(columnarBatch != null);
+    columnarBatch.reset();
+    if (rowsReturned >= totalRowCount) return false;
+    checkEndOfRowGroup();
+
+    int num = (int)Math.min((long) columnarBatch.capacity(), totalRowCount - rowsReturned);
+    for (int i = 0; i < columnReaders.length; ++i) {
+      switch (columnReaders[i].descriptor.getType()) {
+        case INT32:
+          columnReaders[i].readIntBatch(num, columnarBatch.column(i));
+          break;
+        default:
+          throw new IOException("Unsupported type: " + columnReaders[i].descriptor.getType());
+      }
+    }
+    rowsReturned += num;
+    columnarBatch.setNumRows(num);
+    return true;
+  }
+
   private void initializeInternal() throws IOException {
     /**
      * Check that the requested schema is supported.
@@ -382,7 +453,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
    *
    * Decoder to return values from a single column.
    */
-  private static final class ColumnReader {
+  private final class ColumnReader {
     /**
      * Total number of values read.
      */
@@ -416,6 +487,10 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     private IntIterator definitionLevelColumn;
     private ValuesReader dataColumn;
 
+    // Only set if vectorized decoding is true. This is used instead of the row by row decoding
+    // with `definitionLevelColumn`.
+    private VectorizedRleValuesReader defColumn;
+
     /**
      * Total number of values in this column (in this row group).
      */
@@ -521,6 +596,35 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       return definitionLevelColumn.nextInt() == maxDefLevel;
     }
 
+    /**
+     * Reads `total` values from this columnReader into column.
+     * TODO: implement the other encodings.
+     */
+    private void readIntBatch(int total, ColumnVector column) throws IOException {
+      int rowId = 0;
+      while (total > 0) {
+        // Compute the number of values we want to read in this page.
+        int leftInPage = (int)(endOfPageValueCount - valuesRead);
+        if (leftInPage == 0) {
+          readPage();
+          leftInPage = (int)(endOfPageValueCount - valuesRead);
+        }
+        int num = Math.min(total, leftInPage);
+        defColumn.readIntegers(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader)dataColumn, 0);
+
+        // Remap the values if it is dictionary encoded.
+        if (useDictionary) {
+          for (int i = rowId; i < rowId + num; ++i) {
+            column.putInt(i, dictionary.decodeToInt(column.getInt(i)));
+          }
+        }
+        valuesRead += num;
+        rowId += num;
+        total -= num;
+      }
+    }
+
     private void readPage() throws IOException {
       DataPage page = pageReader.readPage();
       // TODO: Why is this a visitor?
@@ -547,21 +651,28 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       });
     }
 
-    private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount)
-        throws IOException {
-      this.pageValueCount = valueCount;
+    private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset)throws IOException {
       this.endOfPageValueCount = valuesRead + pageValueCount;
       if (dataEncoding.usesDictionary()) {
+        this.dataColumn = null;
         if (dictionary == null) {
           throw new IOException(
               "could not read page in col " + descriptor +
                   " as the dictionary was missing for encoding " + dataEncoding);
         }
-        this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
-            descriptor, VALUES, dictionary);
+        if (columnarBatch != null && dataEncoding == Encoding.PLAIN_DICTIONARY) {
+          this.dataColumn = new VectorizedRleValuesReader();
+        } else {
+          this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
+              descriptor, VALUES, dictionary);
+        }
         this.useDictionary = true;
       } else {
-        this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+        if (columnarBatch != null && dataEncoding == Encoding.PLAIN) {
+          this.dataColumn = new VectorizedPlainValuesReader(4);
+        } else {
+          this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+        }
         this.useDictionary = false;
       }
 
@@ -573,8 +684,19 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     }
 
     private void readPageV1(DataPageV1 page) throws IOException {
+      this.pageValueCount = page.getValueCount();
       ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
-      ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+      ValuesReader dlReader;
+
+      // Initialize the decoders. Use custom ones if vectorized decoding is enabled.
+      if (columnarBatch != null && page.getDlEncoding() == Encoding.RLE) {
+        int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+        assert(bitWidth != 0); // not implemented
+        this.defColumn = new VectorizedRleValuesReader(bitWidth);
+        dlReader = this.defColumn;
+      } else {
+        dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+      }
       this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
       this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
       try {
@@ -583,20 +705,20 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
         int next = rlReader.getNextOffset();
         dlReader.initFromPage(pageValueCount, bytes, next);
         next = dlReader.getNextOffset();
-        initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+        initDataReader(page.getValueEncoding(), bytes, next);
       } catch (IOException e) {
         throw new IOException("could not read page " + page + " in col " + descriptor, e);
       }
     }
 
     private void readPageV2(DataPageV2 page) throws IOException {
+      this.pageValueCount = page.getValueCount();
       this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
           page.getRepetitionLevels(), descriptor);
       this.definitionLevelColumn = createRLEIterator(descriptor.getMaxDefinitionLevel(),
           page.getDefinitionLevels(), descriptor);
       try {
-        initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0,
-            page.getValueCount());
+        initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
       } catch (IOException e) {
         throw new IOException("could not read page " + page + " in col " + descriptor, e);
       }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..dac0c52ebd2cfbbeffdf175ed7200ff4c1bb51c5
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.unsafe.Platform;
+
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
+ */
+public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
+  private byte[] buffer;
+  private int offset;
+  private final int byteSize;
+
+  public VectorizedPlainValuesReader(int byteSize) {
+    this.byteSize = byteSize;
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
+    this.buffer = bytes;
+    this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
+  }
+
+  @Override
+  public void skip() {
+    offset += byteSize;
+  }
+
+  @Override
+  public void skip(int n) {
+    offset += n * byteSize;
+  }
+
+  @Override
+  public void readIntegers(int total, ColumnVector c, int rowId) {
+    c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
+    offset += 4 * total;
+  }
+
+  @Override
+  public int readInteger() {
+    int v = Platform.getInt(buffer, offset);
+    offset += 4;
+    return v;
+  }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..493ec9deed49971eb75b2e86c9ab9c0f05346a5a
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * A values reader for Parquet's run-length encoded data. This is based off of the version in
+ * parquet-mr with these changes:
+ *  - Supports the vectorized interface.
+ *  - Works on byte arrays(byte[]) instead of making byte streams.
+ *
+ * This encoding is used in multiple places:
+ *  - Definition/Repetition levels
+ *  - Dictionary ids.
+ */
+public final class VectorizedRleValuesReader extends ValuesReader {
+  // Current decoding mode. The encoded data contains groups of either run length encoded data
+  // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+  // the number of values in the group.
+  // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
+  private enum MODE {
+    RLE,
+    PACKED
+  }
+
+  // Encoded data.
+  private byte[] in;
+  private int end;
+  private int offset;
+
+  // bit/byte width of decoded data and utility to batch unpack them.
+  private int bitWidth;
+  private int bytesWidth;
+  private BytePacker packer;
+
+  // Current decoding mode and values
+  private MODE mode;
+  private int currentCount;
+  private int currentValue;
+
+  // Buffer of decoded values if the values are PACKED.
+  private int[] currentBuffer = new int[16];
+  private int currentBufferIdx = 0;
+
+  // If true, the bit width is fixed. This decoder is used in different places and this also
+  // controls if we need to read the bitwidth from the beginning of the data stream.
+  private final boolean fixedWidth;
+
+  public VectorizedRleValuesReader() {
+    fixedWidth = false;
+  }
+
+  public VectorizedRleValuesReader(int bitWidth) {
+    fixedWidth = true;
+    init(bitWidth);
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int start) {
+    this.offset = start;
+    this.in = page;
+    if (fixedWidth) {
+      int length = readIntLittleEndian();
+      this.end = this.offset + length;
+    } else {
+      this.end = page.length;
+      if (this.end != this.offset) init(page[this.offset++] & 255);
+    }
+    this.currentCount = 0;
+  }
+
+  /**
+   * Initializes the internal state for decoding ints of `bitWidth`.
+   */
+  private void init(int bitWidth) {
+    Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+    this.bitWidth = bitWidth;
+    this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+  }
+
+  @Override
+  public int getNextOffset() {
+    return this.end;
+  }
+
+  @Override
+  public boolean readBoolean() {
+    return this.readInteger() != 0;
+  }
+
+  @Override
+  public void skip() {
+    this.readInteger();
+  }
+
+  @Override
+  public int readValueDictionaryId() {
+    return readInteger();
+  }
+
+  @Override
+  public int readInteger() {
+    if (this.currentCount == 0) { this.readNextGroup(); }
+
+    this.currentCount--;
+    switch (mode) {
+      case RLE:
+        return this.currentValue;
+      case PACKED:
+        return this.currentBuffer[currentBufferIdx++];
+    }
+    throw new RuntimeException("Unreachable");
+  }
+
+  /**
+   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader
+   * reads the definition levels and then will read from `data` for the non-null values.
+   * If the value is null, c will be populated with `nullValue`.
+   *
+   * This is a batched version of this logic:
+   *  if (this.readInt() == level) {
+   *    c[rowId] = data.readInteger();
+   *  } else {
+   *    c[rowId] = nullValue;
+   *  }
+   */
+  public void readIntegers(int total, ColumnVector c, int rowId, int level,
+      VectorizedValuesReader data, int nullValue) {
+    int left = total;
+    while (left > 0) {
+      if (this.currentCount == 0) this.readNextGroup();
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == level) {
+            data.readIntegers(n, c, rowId);
+            c.putNotNulls(rowId, n);
+          } else {
+            c.putNulls(rowId, n);
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == level) {
+              c.putInt(rowId + i, data.readInteger());
+              c.putNotNull(rowId + i);
+            } else {
+              c.putInt(rowId + i, nullValue);
+              c.putNull(rowId + i);
+            }
+          }
+          break;
+      }
+      rowId += n;
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  /**
+   * Reads the next varint encoded int.
+   */
+  private int readUnsignedVarInt() {
+    int value = 0;
+    int shift = 0;
+    int b;
+    do {
+      b = in[offset++] & 255;
+      value |= (b & 0x7F) << shift;
+      shift += 7;
+    } while ((b & 0x80) != 0);
+    return value;
+  }
+
+  /**
+   * Reads the next 4 byte little endian int.
+   */
+  private int readIntLittleEndian() {
+    int ch4 = in[offset] & 255;
+    int ch3 = in[offset + 1] & 255;
+    int ch2 = in[offset + 2] & 255;
+    int ch1 = in[offset + 3] & 255;
+    offset += 4;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  /**
+   * Reads the next byteWidth little endian int.
+   */
+  private int readIntLittleEndianPaddedOnBitWidth() {
+    switch (bytesWidth) {
+      case 0:
+        return 0;
+      case 1:
+        return in[offset++] & 255;
+      case 2: {
+        int ch2 = in[offset] & 255;
+        int ch1 = in[offset + 1] & 255;
+        offset += 2;
+        return (ch1 << 8) + ch2;
+      }
+      case 3: {
+        int ch3 = in[offset] & 255;
+        int ch2 = in[offset + 1] & 255;
+        int ch1 = in[offset + 2] & 255;
+        offset += 3;
+        return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+      }
+      case 4: {
+        return readIntLittleEndian();
+      }
+    }
+    throw new RuntimeException("Unreachable");
+  }
+
+  /**
+   * Reads the next group.
+   */
+  private void readNextGroup()  {
+    Preconditions.checkArgument(this.offset < this.end,
+        "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
+    int header = readUnsignedVarInt();
+    this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+    switch (mode) {
+      case RLE:
+        this.currentCount = header >>> 1;
+        this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+        return;
+      case PACKED:
+        int numGroups = header >>> 1;
+        this.currentCount = numGroups * 8;
+
+        if (this.currentBuffer.length < this.currentCount) {
+          this.currentBuffer = new int[this.currentCount];
+        }
+        currentBufferIdx = 0;
+        int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
+
+        bytesToRead = Math.min(bytesToRead, this.end - this.offset);
+        int valueIndex = 0;
+        for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
+          this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
+          valueIndex += 8;
+        }
+        offset += bytesToRead;
+        return;
+      default:
+        throw new ParquetDecodingException("not a valid mode " + this.mode);
+    }
+  }
+}
\ No newline at end of file
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
new file mode 100644
index 0000000000000000000000000000000000000000..49a9ed83d590a27cd33ce4e6a3a9c4b9014d2797
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * Interface for value decoding that supports vectorized (aka batched) decoding.
+ * TODO: merge this into parquet-mr.
+ */
+public interface VectorizedValuesReader {
+  int readInteger();
+
+  /*
+   * Reads `total` values into `c` start at `c[rowId]`
+   */
+  void readIntegers(int total, ColumnVector c, int rowId);
+
+  // TODO: add all the other parquet types.
+
+  void skip(int n);
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index d9dde92ceb6d734583f37f98179c18dcf444cfe4..85509751dbbee716e21a1d2622477ee8d5ed0413 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.types.DataType;
 
 /**
@@ -33,8 +34,8 @@ public abstract class ColumnVector {
   /**
    * Allocates a column with each element of size `width` either on or off heap.
    */
-  public static ColumnVector allocate(int capacity, DataType type, boolean offHeap) {
-    if (offHeap) {
+  public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
+    if (mode == MemoryMode.OFF_HEAP) {
       return new OffHeapColumnVector(capacity, type);
     } else {
       return new OnHeapColumnVector(capacity, type);
@@ -111,7 +112,7 @@ public abstract class ColumnVector {
   public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
    * The data in src must be 4-byte little endian ints.
    */
   public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
@@ -138,7 +139,7 @@ public abstract class ColumnVector {
   public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
    * The data in src must be ieee formated doubles.
    */
   public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 47defac4534dc227ca4334b27d75e9af1ba48b8d..2c55f854c2419f890b7192511cfbfaf0f3fa6daf 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
@@ -59,12 +60,12 @@ public final class ColumnarBatch {
   // Total number of rows that have been filtered.
   private int numRowsFiltered = 0;
 
-  public static ColumnarBatch allocate(StructType schema, boolean offHeap) {
-    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, offHeap);
+  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) {
+    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
   }
 
-  public static ColumnarBatch allocate(StructType schema, boolean offHeap, int maxRows) {
-    return new ColumnarBatch(schema, maxRows, offHeap);
+  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) {
+    return new ColumnarBatch(schema, maxRows, memMode);
   }
 
   /**
@@ -282,7 +283,7 @@ public final class ColumnarBatch {
     ++numRowsFiltered;
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, boolean offHeap) {
+  private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
     this.schema = schema;
     this.capacity = maxRows;
     this.columns = new ColumnVector[schema.size()];
@@ -290,7 +291,7 @@ public final class ColumnarBatch {
 
     for (int i = 0; i < schema.fields().length; ++i) {
       StructField field = schema.fields()[i];
-      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), offHeap);
+      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode);
     }
   }
 }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2a9a2d1104b22bc88cd201fc5d32898b03b89480..6180dd308e5e3ffe066963c18c067b66fb0cde10 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -49,6 +49,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     } else {
       throw new RuntimeException("Unhandled " + type);
     }
+    anyNullsSet = true;
     reset();
   }
 
@@ -98,6 +99,7 @@ public final class OffHeapColumnVector extends ColumnVector {
 
   @Override
   public final void putNotNulls(int rowId, int count) {
+    if (!anyNullsSet) return;
     long offset = nulls + rowId;
     for (int i = 0; i < count; ++i, ++offset) {
       Platform.putByte(null, offset, (byte) 0);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index a7b3addf11b143492a0d7552540a0990abe76cc7..76d9956c3842f8d8a37eb7d5031d172b2d352b0e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -97,6 +97,7 @@ public final class OnHeapColumnVector extends ColumnVector {
 
   @Override
   public final void putNotNulls(int rowId, int count) {
+    if (!anyNullsSet) return;
     for (int i = 0; i < count; ++i) {
       nulls[rowId + i] = (byte)0;
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index ae95b50e1ee760dad4fc192fe9acbcff7af84bb6..14be9eec9a97a777ce4283c8dbb9d7f7c0674402 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -59,24 +59,31 @@ object ParquetReadBenchmark {
   }
 
   def intScanBenchmark(values: Int): Unit = {
+    // Benchmarks running through spark sql.
+    val sqlBenchmark = new Benchmark("SQL Single Int Column Scan", values)
+    // Benchmarks driving reader component directly.
+    val parquetReaderBenchmark = new Benchmark("Parquet Reader Single Int Column Scan", values)
+
     withTempPath { dir =>
-      sqlContext.range(values).write.parquet(dir.getCanonicalPath)
-      withTempTable("tempTable") {
+      withTempTable("t1", "tempTable") {
+        sqlContext.range(values).registerTempTable("t1")
+        sqlContext.sql("select cast(id as INT) as id from t1")
+            .write.parquet(dir.getCanonicalPath)
         sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
-        val benchmark = new Benchmark("Single Int Column Scan", values)
 
-        benchmark.addCase("SQL Parquet Reader") { iter =>
+        sqlBenchmark.addCase("SQL Parquet Reader") { iter =>
           sqlContext.sql("select sum(id) from tempTable").collect()
         }
 
-        benchmark.addCase("SQL Parquet MR") { iter =>
+        sqlBenchmark.addCase("SQL Parquet MR") { iter =>
           withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
             sqlContext.sql("select sum(id) from tempTable").collect()
           }
         }
 
         val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
-        benchmark.addCase("ParquetReader") { num =>
+        // Driving the parquet reader directly without Spark.
+        parquetReaderBenchmark.addCase("ParquetReader") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
             val reader = new UnsafeRowParquetRecordReader
@@ -87,26 +94,82 @@ object ParquetReadBenchmark {
               if (!record.isNullAt(0)) sum += record.getInt(0)
             }
             reader.close()
-        }}
+          }
+        }
+
+        // Driving the parquet reader in batch mode directly.
+        parquetReaderBenchmark.addCase("ParquetReader(Batched)") { num =>
+          var sum = 0L
+          files.map(_.asInstanceOf[String]).foreach { p =>
+            val reader = new UnsafeRowParquetRecordReader
+            try {
+              reader.initialize(p, ("id" :: Nil).asJava)
+              val batch = reader.resultBatch()
+              val col = batch.column(0)
+              while (reader.nextBatch()) {
+                val numRows = batch.numRows()
+                var i = 0
+                while (i < numRows) {
+                  if (!col.getIsNull(i)) sum += col.getInt(i)
+                  i += 1
+                }
+              }
+            } finally {
+              reader.close()
+            }
+          }
+        }
+
+        // Decoding in vectorized but having the reader return rows.
+        parquetReaderBenchmark.addCase("ParquetReader(Batch -> Row)") { num =>
+          var sum = 0L
+          files.map(_.asInstanceOf[String]).foreach { p =>
+            val reader = new UnsafeRowParquetRecordReader
+            try {
+              reader.initialize(p, ("id" :: Nil).asJava)
+              val batch = reader.resultBatch()
+              while (reader.nextBatch()) {
+                val it = batch.rowIterator()
+                while (it.hasNext) {
+                  val record = it.next()
+                  if (!record.isNullAt(0)) sum += record.getInt(0)
+                }
+              }
+            } finally {
+              reader.close()
+            }
+          }
+        }
 
         /*
-          Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
-          Single Int Column Scan:      Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-          -------------------------------------------------------------------------
-          SQL Parquet Reader                 1910.0            13.72         1.00 X
-          SQL Parquet MR                     2330.0            11.25         0.82 X
-          ParquetReader                      1252.6            20.93         1.52 X
+        Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+        Single Int Column Scan:            Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+        -------------------------------------------------------------------------------
+        SQL Parquet Reader                       1682.6            15.58         1.00 X
+        SQL Parquet MR                           2379.6            11.02         0.71 X
         */
-        benchmark.run()
+        sqlBenchmark.run()
+
+        /*
+        Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+        Parquet Reader Single Int Column Scan:     Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+        -------------------------------------------------------------------------------
+        ParquetReader                            610.40            25.77         1.00 X
+        ParquetReader(Batched)                   172.66            91.10         3.54 X
+        ParquetReader(Batch -> Row)              192.28            81.80         3.17 X
+        */
+        parquetReaderBenchmark.run()
       }
     }
   }
 
   def intStringScanBenchmark(values: Int): Unit = {
+    val benchmark = new Benchmark("Int and String Scan", values)
+
     withTempPath { dir =>
       withTempTable("t1", "tempTable") {
         sqlContext.range(values).registerTempTable("t1")
-        sqlContext.sql("select id as c1, cast(id as STRING) as c2 from t1")
+        sqlContext.sql("select cast(id as INT) as c1, cast(id as STRING) as c2 from t1")
             .write.parquet(dir.getCanonicalPath)
         sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index e28153d12a354f31765910a6df26b86ec99690aa..bfe944d835bd304ec292c436fb7bc9b12f6ad8ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.ByteBuffer
 
+import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.vectorized.ColumnVector
 import org.apache.spark.sql.types.IntegerType
@@ -136,7 +137,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with on heap memory
     val columnOnHeap = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, false)
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -155,7 +156,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with off heap memory
     def columnOffHeap = { i: Int => {
-      val col = ColumnVector.allocate(count, IntegerType, true)
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -174,7 +175,7 @@ object ColumnarBatchBenchmark {
 
     // Access by directly getting the buffer backing the column.
     val columnOffheapDirect = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, true)
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var addr = col.valuesNativeAddress()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 305a83e3e45c9a74fcb76e9e211966a59875a070..d5e517c7f56be0bc358c2a1e57f92a20aabcba97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
@@ -28,10 +29,10 @@ import org.apache.spark.unsafe.Platform
 
 class ColumnarBatchSuite extends SparkFunSuite {
   test("Null Apis") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[Boolean]
 
-      val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+      val column = ColumnVector.allocate(1024, IntegerType, memMode)
       var idx = 0
       assert(column.anyNullsSet() == false)
 
@@ -64,7 +65,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       reference.zipWithIndex.foreach { v =>
         assert(v._1 == column.getIsNull(v._2))
-        if (offHeap) {
+        if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.nullsNativeAddress()
           assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
         }
@@ -74,12 +75,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   test("Int Apis") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Int]
 
-      val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+      val column = ColumnVector.allocate(1024, IntegerType, memMode)
       var idx = 0
 
       val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
@@ -131,8 +132,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
-        if (offHeap) {
+        assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
+        if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.valuesNativeAddress()
           assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
         }
@@ -142,12 +143,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   test("Double APIs") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Double]
 
-      val column = ColumnVector.allocate(1024, DoubleType, offHeap)
+      val column = ColumnVector.allocate(1024, DoubleType, memMode)
       var idx = 0
 
       val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
@@ -198,8 +199,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
-        if (offHeap) {
+        assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" + memMode)
+        if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.valuesNativeAddress()
           assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
         }
@@ -209,13 +210,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   test("ColumnarBatch basic") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val schema = new StructType()
         .add("intCol", IntegerType)
         .add("doubleCol", DoubleType)
         .add("intCol2", IntegerType)
 
-      val batch = ColumnarBatch.allocate(schema, offHeap)
+      val batch = ColumnarBatch.allocate(schema, memMode)
       assert(batch.numCols() == 3)
       assert(batch.numRows() == 0)
       assert(batch.numValidRows() == 0)