diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java
new file mode 100644
index 0000000000000000000000000000000000000000..0930edeb352dc56ed796784baca580e6c329c88f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.spark.sql.execution.vectorized.Dictionary;
+
+public final class ParquetDictionary implements Dictionary {
+  private org.apache.parquet.column.Dictionary dictionary;
+
+  public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) {
+    this.dictionary = dictionary;
+  }
+
+  @Override
+  public int decodeToInt(int id) {
+    return dictionary.decodeToInt(id);
+  }
+
+  @Override
+  public long decodeToLong(int id) {
+    return dictionary.decodeToLong(id);
+  }
+
+  @Override
+  public float decodeToFloat(int id) {
+    return dictionary.decodeToFloat(id);
+  }
+
+  @Override
+  public double decodeToDouble(int id) {
+    return dictionary.decodeToDouble(id);
+  }
+
+  @Override
+  public byte[] decodeToBinary(int id) {
+    return dictionary.decodeToBinary(id).getBytes();
+  }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 9d641b528723a6dda5d1fe6b343b6e414d53042d..fd8db1727212f48a983d12ff258bd51483e87227 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -169,7 +169,7 @@ public class VectorizedColumnReader {
           // Column vector supports lazy decoding of dictionary values so just set the dictionary.
           // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
           // non-dictionary encoded values have already been added).
-          column.setDictionary(dictionary);
+          column.setDictionary(new ParquetDictionary(dictionary));
         } else {
           decodeDictionaryIds(rowId, num, column, dictionaryIds);
         }
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 51bdf0f0f2291878bc7eeb37ce5bc15ecbe01056..04f8141d66e9d3f22c9e6035e3ea7ee39529df7a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -154,12 +154,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     return (float) rowsReturned / totalRowCount;
   }
 
-  /**
-   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
-   * This object is reused. Calling this enables the vectorized reader. This should be called
-   * before any calls to nextKeyValue/nextBatch.
-   */
-
   // Creates a columnar batch that includes the schema from the data files and the additional
   // partition columns appended to the end of the batch.
   // For example, if the data contains two columns, with 2 partition columns:
@@ -204,12 +198,17 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
   }
 
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
   public ColumnarBatch resultBatch() {
     if (columnarBatch == null) initBatch();
     return columnarBatch;
   }
 
-  /*
+  /**
    * Can be called before any rows are returned to enable returning columnar batches directly.
    */
   public void enableReturningBatches() {
@@ -237,9 +236,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
   }
 
   private void initializeInternal() throws IOException, UnsupportedOperationException {
-    /**
-     * Check that the requested schema is supported.
-     */
+    // Check that the requested schema is supported.
     missingColumns = new boolean[requestedSchema.getFieldCount()];
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
       Type t = requestedSchema.getFields().get(i);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index ad267ab0c9c477fa5310054814e030b4e5379b9a..24260a60197f205f017dd555583c9adbca83be79 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -20,8 +20,6 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.io.api.Binary;
 
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -313,8 +311,8 @@ public abstract class ColumnVector implements AutoCloseable {
   }
 
   /**
-   * Ensures that there is enough storage to store capcity elements. That is, the put() APIs
-   * must work for all rowIds < capcity.
+   * Ensures that there is enough storage to store capacity elements. That is, the put() APIs
+   * must work for all rowIds < capacity.
    */
   protected abstract void reserveInternal(int capacity);
 
@@ -479,7 +477,6 @@ public abstract class ColumnVector implements AutoCloseable {
 
   /**
    * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   * src should contain `count` doubles written as ieee format.
    */
   public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);
 
@@ -506,7 +503,6 @@ public abstract class ColumnVector implements AutoCloseable {
 
   /**
    * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   * src should contain `count` doubles written as ieee format.
    */
   public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
 
@@ -628,8 +624,8 @@ public abstract class ColumnVector implements AutoCloseable {
       ColumnVector.Array a = getByteArray(rowId);
       return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
     } else {
-      Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
-      return UTF8String.fromBytes(v.getBytes());
+      byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+      return UTF8String.fromBytes(bytes);
     }
   }
 
@@ -643,8 +639,7 @@ public abstract class ColumnVector implements AutoCloseable {
       System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
       return bytes;
     } else {
-      Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
-      return v.getBytes();
+      return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
     }
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java
new file mode 100644
index 0000000000000000000000000000000000000000..c698168b4c278e2f3586ed6fc5cfabc858e4d975
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/Dictionary.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+/**
+ * The interface for dictionary in ColumnVector to decode dictionary encoded values.
+ */
+public interface Dictionary {
+
+  int decodeToInt(int id);
+
+  long decodeToLong(int id);
+
+  float decodeToFloat(int id);
+
+  double decodeToDouble(int id);
+
+  byte[] decodeToBinary(int id);
+}