diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 35c51dec0bcf5bf6cb98903e5a899a0098f87eea..90de11182e605f7210445823db30383cdcd9fb30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -31,6 +31,7 @@ private[spark] object SQLConf {
   val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
   val CODEGEN_ENABLED = "spark.sql.codegen"
   val DIALECT = "spark.sql.dialect"
+  val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -87,8 +88,7 @@ trait SQLConf {
    *
    * Defaults to false as this feature is currently experimental.
    */
-  private[spark] def codegenEnabled: Boolean =
-    if (getConf(CODEGEN_ENABLED, "false") == "true") true else false
+  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean
 
   /**
    * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -108,6 +108,12 @@ trait SQLConf {
   private[spark] def defaultSizeInBytes: Long =
     getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong
 
+  /**
+   * When set to true, we always treat byte arrays in Parquet files as strings.
+   */
+  private[spark] def isParquetBinaryAsString: Boolean =
+    getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index b3bae5db0edbc3882a9dc1886d60cefe0e2c049e..053b2a154389c9cf2fba691feb4e153b5bf1fa50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -60,7 +60,11 @@ private[sql] case class ParquetRelation(
       .getSchema
 
   /** Attributes */
-  override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
+  override val output =
+    ParquetTypesConverter.readSchemaFromFile(
+      new Path(path),
+      conf,
+      sqlContext.isParquetBinaryAsString)
 
   override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 6d4ce32ac5bfa08d21c62cabfcf0486353fb76df..6a657c20fe46c6b9bd03ebc7a92608c5318e90b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -80,9 +80,10 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
       }
     }
     // if both unavailable, fall back to deducing the schema from the given Parquet schema
+    // TODO: Why it can be null?
     if (schema == null)  {
       log.debug("falling back to Parquet read schema")
-      schema = ParquetTypesConverter.convertToAttributes(parquetSchema)
+      schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
     }
     log.debug(s"list of attributes that will be read: $schema")
     new RowRecordMaterializer(parquetSchema, schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 37091bcf73dd6bc7386c06248e04fceeff10c493..b0579f76da0732938b19bb29017866b70821cd6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -43,10 +43,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
   def isPrimitiveType(ctype: DataType): Boolean =
     classOf[PrimitiveType] isAssignableFrom ctype.getClass
 
-  def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
+  def toPrimitiveDataType(
+      parquetType: ParquetPrimitiveType,
+      binayAsString: Boolean): DataType =
     parquetType.getPrimitiveTypeName match {
       case ParquetPrimitiveTypeName.BINARY
-        if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
+        if (parquetType.getOriginalType == ParquetOriginalType.UTF8 ||
+          binayAsString) => StringType
       case ParquetPrimitiveTypeName.BINARY => BinaryType
       case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
       case ParquetPrimitiveTypeName.DOUBLE => DoubleType
@@ -85,7 +88,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
    * @param parquetType The type to convert.
    * @return The corresponding Catalyst type.
    */
-  def toDataType(parquetType: ParquetType): DataType = {
+  def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
     def correspondsToMap(groupType: ParquetGroupType): Boolean = {
       if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
         false
@@ -107,7 +110,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
 
     if (parquetType.isPrimitive) {
-      toPrimitiveDataType(parquetType.asPrimitiveType)
+      toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
     } else {
       val groupType = parquetType.asGroupType()
       parquetType.getOriginalType match {
@@ -116,7 +119,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         case ParquetOriginalType.LIST => { // TODO: check enums!
           assert(groupType.getFieldCount == 1)
           val field = groupType.getFields.apply(0)
-          ArrayType(toDataType(field), containsNull = false)
+          ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
         }
         case ParquetOriginalType.MAP => {
           assert(
@@ -126,9 +129,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
           assert(
             keyValueGroup.getFieldCount == 2,
             "Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
-          val keyType = toDataType(keyValueGroup.getFields.apply(0))
+          val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
           assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-          val valueType = toDataType(keyValueGroup.getFields.apply(1))
+          val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
           assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
           // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
           // at here.
@@ -138,22 +141,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
           // Note: the order of these checks is important!
           if (correspondsToMap(groupType)) { // MapType
             val keyValueGroup = groupType.getFields.apply(0).asGroupType()
-            val keyType = toDataType(keyValueGroup.getFields.apply(0))
+            val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
             assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
-            val valueType = toDataType(keyValueGroup.getFields.apply(1))
+            val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
             assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
             // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
             // at here.
             MapType(keyType, valueType)
           } else if (correspondsToArray(groupType)) { // ArrayType
-            val elementType = toDataType(groupType.getFields.apply(0))
+            val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
             ArrayType(elementType, containsNull = false)
           } else { // everything else: StructType
             val fields = groupType
               .getFields
               .map(ptype => new StructField(
               ptype.getName,
-              toDataType(ptype),
+              toDataType(ptype, isBinaryAsString),
               ptype.getRepetition != Repetition.REQUIRED))
             StructType(fields)
           }
@@ -276,7 +279,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
   }
 
-  def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
+  def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = {
     parquetSchema
       .asGroupType()
       .getFields
@@ -284,7 +287,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         field =>
           new AttributeReference(
             field.getName,
-            toDataType(field),
+            toDataType(field, isBinaryAsString),
             field.getRepetition != Repetition.REQUIRED)())
   }
 
@@ -404,7 +407,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
    * @param conf The Hadoop configuration to use.
    * @return A list of attributes that make up the schema.
    */
-  def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
+  def readSchemaFromFile(
+      origPath: Path,
+      conf: Option[Configuration],
+      isBinaryAsString: Boolean): Seq[Attribute] = {
     val keyValueMetadata: java.util.Map[String, String] =
       readMetaData(origPath, conf)
         .getFileMetaData
@@ -413,7 +419,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
       convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
     } else {
       val attributes = convertToAttributes(
-        readMetaData(origPath, conf).getFileMetaData.getSchema)
+        readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
       log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
       attributes
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 502f6702e394e08136c7f4e8a979cd4769731e29..172dcd6aa0ee37e5c8db19bc59c6639074f9739b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -21,8 +21,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
 
 import parquet.hadoop.ParquetFileWriter
 import parquet.hadoop.util.ContextUtil
-import parquet.schema.MessageTypeParser
-
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.Job
 
@@ -33,7 +31,6 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType}
 import org.apache.spark.sql.catalyst.util.getTempFilePath
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.util.Utils
@@ -138,6 +135,57 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     }
   }
 
+  test("Treat binary as string") {
+    val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
+
+    // Create the test file.
+    val file = getTempFilePath("parquet")
+    val path = file.toString
+    val range = (0 to 255)
+    val rowRDD = TestSQLContext.sparkContext.parallelize(range)
+      .map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes))
+    // We need to ask Parquet to store the String column as a Binary column.
+    val schema = StructType(
+      StructField("c1", IntegerType, false) ::
+      StructField("c2", BinaryType, false) :: Nil)
+    val schemaRDD1 = applySchema(rowRDD, schema)
+    schemaRDD1.saveAsParquetFile(path)
+    val resultWithBinary = parquetFile(path).collect
+    range.foreach {
+      i =>
+        assert(resultWithBinary(i).getInt(0) === i)
+        assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
+    }
+
+    TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
+    // This ParquetRelation always use Parquet types to derive output.
+    val parquetRelation = new ParquetRelation(
+      path.toString,
+      Some(TestSQLContext.sparkContext.hadoopConfiguration),
+      TestSQLContext) {
+      override val output =
+        ParquetTypesConverter.convertToAttributes(
+          ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
+          TestSQLContext.isParquetBinaryAsString)
+    }
+    val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
+    val resultWithString = schemaRDD.collect
+    range.foreach {
+      i =>
+        assert(resultWithString(i).getInt(0) === i)
+        assert(resultWithString(i)(1) === s"val_$i")
+    }
+
+    schemaRDD.registerTempTable("tmp")
+    checkAnswer(
+      sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
+      (5, "val_5") ::
+      (7, "val_7") :: Nil)
+
+    // Set it back.
+    TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
+  }
+
   test("Read/Write All Types with non-primitive type") {
     val tempDir = getTempFilePath("parquetTest").getCanonicalPath
     val range = (0 to 255)