diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index 9d9150246c8d4d349925f4eafcd6f88625d23a46..10df8c3310092cdbffa177755da88968a05290c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
 
 import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}
 
+import parquet.column.Dictionary
 import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
 import parquet.schema.MessageType
 
@@ -102,12 +103,8 @@ private[sql] object CatalystConverter {
       }
       // Strings, Shorts and Bytes do not have a corresponding type in Parquet
       // so we need to treat them separately
-      case StringType => {
-        new CatalystPrimitiveConverter(parent, fieldIndex) {
-          override def addBinary(value: Binary): Unit =
-            parent.updateString(fieldIndex, value)
-        }
-      }
+      case StringType =>
+        new CatalystPrimitiveStringConverter(parent, fieldIndex)
       case ShortType => {
         new CatalystPrimitiveConverter(parent, fieldIndex) {
           override def addInt(value: Int): Unit =
@@ -197,8 +194,8 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
   protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
     updateField(fieldIndex, value.getBytes)
 
-  protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
-    updateField(fieldIndex, value.toStringUsingUTF8)
+  protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
+    updateField(fieldIndex, value)
 
   protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
     updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
@@ -384,8 +381,8 @@ private[parquet] class CatalystPrimitiveRowConverter(
   override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit =
     current.update(fieldIndex, value.getBytes)
 
-  override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit =
-    current.setString(fieldIndex, value.toStringUsingUTF8)
+  override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
+    current.setString(fieldIndex, value)
 
   override protected[parquet] def updateDecimal(
       fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
@@ -426,6 +423,33 @@ private[parquet] class CatalystPrimitiveConverter(
     parent.updateLong(fieldIndex, value)
 }
 
+/**
+ * A `parquet.io.api.PrimitiveConverter` that converts Parquet Binary to Catalyst String.
+ * Supports dictionaries to reduce Binary to String conversion overhead.
+ *
+ * Follows pattern in Parquet of using dictionaries, where supported, for String conversion.
+ *
+ * @param parent The parent group converter.
+ * @param fieldIndex The index inside the record.
+ */
+private[parquet] class CatalystPrimitiveStringConverter(parent: CatalystConverter, fieldIndex: Int)
+  extends CatalystPrimitiveConverter(parent, fieldIndex) {
+
+  private[this] var dict: Array[String] = null
+
+  override def hasDictionarySupport: Boolean = true
+
+  override def setDictionary(dictionary: Dictionary):Unit =
+    dict = Array.tabulate(dictionary.getMaxId + 1) {dictionary.decodeToBinary(_).toStringUsingUTF8}
+
+
+  override def addValueFromDictionary(dictionaryId: Int): Unit =
+    parent.updateString(fieldIndex, dict(dictionaryId))
+
+  override def addBinary(value: Binary): Unit =
+    parent.updateString(fieldIndex, value.toStringUsingUTF8)
+}
+
 private[parquet] object CatalystArrayConverter {
   val INITIAL_ARRAY_SIZE = 20
 }
@@ -583,9 +607,9 @@ private[parquet] class CatalystNativeArrayConverter(
     elements += 1
   }
 
-  override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = {
+  override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit = {
     checkGrowBuffer()
-    buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType]
+    buffer(elements) = value.asInstanceOf[NativeType]
     elements += 1
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 1263ff818ea19c714c1d6fa11d8bc087ff631987..3d82f4bce7778b9d4f28c2f79361e6647ba98a93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -85,4 +85,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
       checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
     }
   }
+
+  test("SPARK-5309 strings stored using dictionary compression in parquet") {
+    withParquetTable((0 until 1000).map(i => ("same", "run_" + i /100, 1)), "t") {
+
+      checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t GROUP BY _1, _2"),
+        (0 until 10).map(i => Row("same", "run_" + i, 100)))
+
+      checkAnswer(sql(s"SELECT _1, _2, SUM(_3) FROM t WHERE _2 = 'run_5' GROUP BY _1, _2"),
+        List(Row("same", "run_5", 100)))
+    }
+  }
 }