diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 4e4f647767dc924b8583efe9b4656d5304b3e5a5..225ec6db7d553d1fe3d6587a0200b1c60e9086f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -126,6 +126,13 @@ private[sql] case class ParquetTableScan(
         conf)
 
     if (requestedPartitionOrdinals.nonEmpty) {
+      // This check is based on CatalystConverter.createRootConverter.
+      val primitiveRow = output.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
+
+      // Uses temporary variable to avoid the whole `ParquetTableScan` object being captured into
+      // the `mapPartitionsWithInputSplit` closure below.
+      val outputSize = output.size
+
       baseRDD.mapPartitionsWithInputSplit { case (split, iter) =>
         val partValue = "([^=]+)=([^=]+)".r
         val partValues =
@@ -143,19 +150,47 @@ private[sql] case class ParquetTableScan(
           relation.partitioningAttributes
             .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))
 
-        new Iterator[Row] {
-          def hasNext = iter.hasNext
-          def next() = {
-            val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
-
-            // Parquet will leave partitioning columns empty, so we fill them in here.
-            var i = 0
-            while (i < requestedPartitionOrdinals.size) {
-              row(requestedPartitionOrdinals(i)._2) =
-                partitionRowValues(requestedPartitionOrdinals(i)._1)
-              i += 1
+        if (primitiveRow) {
+          new Iterator[Row] {
+            def hasNext = iter.hasNext
+            def next() = {
+              // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
+              val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
+
+              // Parquet will leave partitioning columns empty, so we fill them in here.
+              var i = 0
+              while (i < requestedPartitionOrdinals.size) {
+                row(requestedPartitionOrdinals(i)._2) =
+                  partitionRowValues(requestedPartitionOrdinals(i)._1)
+                i += 1
+              }
+              row
+            }
+          }
+        } else {
+          // Create a mutable row since we need to fill in values from partition columns.
+          val mutableRow = new GenericMutableRow(outputSize)
+          new Iterator[Row] {
+            def hasNext = iter.hasNext
+            def next() = {
+              // We are using CatalystGroupConverter and it returns a GenericRow.
+              // Since GenericRow is not mutable, we just cast it to a Row.
+              val row = iter.next()._2.asInstanceOf[Row]
+
+              var i = 0
+              while (i < row.size) {
+                mutableRow(i) = row(i)
+                i += 1
+              }
+              // Parquet will leave partitioning columns empty, so we fill them in here.
+              i = 0
+              while (i < requestedPartitionOrdinals.size) {
+                mutableRow(requestedPartitionOrdinals(i)._2) =
+                  partitionRowValues(requestedPartitionOrdinals(i)._1)
+                i += 1
+              }
+              mutableRow
             }
-            row
           }
         }
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e648618468d5d7f2113b5de5a16739abf13b598f..6d56be3ab8dd4ba72f8fbcc6deefba0ab43b0cc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -482,6 +482,10 @@ private[sql] case class ParquetRelation2(
     // When the data does not include the key and the key is requested then we must fill it in
     // based on information from the input split.
     if (!partitionKeysIncludedInDataSchema && partitionKeyLocations.nonEmpty) {
+      // This check is based on CatalystConverter.createRootConverter.
+      val primitiveRow =
+        requestedSchema.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))
+
       baseRDD.mapPartitionsWithInputSplit { case (split: ParquetInputSplit, iterator) =>
         val partValues = selectedPartitions.collectFirst {
           case p if split.getPath.getParent.toString == p.path => p.values
@@ -489,16 +493,42 @@ private[sql] case class ParquetRelation2(
 
         val requiredPartOrdinal = partitionKeyLocations.keys.toSeq
 
-        iterator.map { pair =>
-          val row = pair._2.asInstanceOf[SpecificMutableRow]
-          var i = 0
-          while (i < requiredPartOrdinal.size) {
-            // TODO Avoids boxing cost here!
-            val partOrdinal = requiredPartOrdinal(i)
-            row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
-            i += 1
+        if (primitiveRow) {
+          iterator.map { pair =>
+            // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow.
+            val row = pair._2.asInstanceOf[SpecificMutableRow]
+            var i = 0
+            while (i < requiredPartOrdinal.size) {
+              // TODO Avoids boxing cost here!
+              val partOrdinal = requiredPartOrdinal(i)
+              row.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
+              i += 1
+            }
+            row
+          }
+        } else {
+          // Create a mutable row since we need to fill in values from partition columns.
+          val mutableRow = new GenericMutableRow(requestedSchema.size)
+          iterator.map { pair =>
+            // We are using CatalystGroupConverter and it returns a GenericRow.
+            // Since GenericRow is not mutable, we just cast it to a Row.
+            val row = pair._2.asInstanceOf[Row]
+            var i = 0
+            while (i < row.size) {
+              // TODO Avoids boxing cost here!
+              mutableRow(i) = row(i)
+              i += 1
+            }
+
+            i = 0
+            while (i < requiredPartOrdinal.size) {
+              // TODO Avoids boxing cost here!
+              val partOrdinal = requiredPartOrdinal(i)
+              mutableRow.update(partitionKeyLocations(partOrdinal), partValues(partOrdinal))
+              i += 1
+            }
+            mutableRow
           }
-          row
         }
       }
     } else {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 6a9d9daf6750c7ea864512d80d8f37de7d5c9318..c8da8eea4e6463d78e4bddf6fdb8d20d7002aae1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -36,6 +36,20 @@ case class ParquetData(intField: Int, stringField: String)
 // The data that also includes the partitioning key
 case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
 
+case class StructContainer(intStructField :Int, stringStructField: String)
+
+case class ParquetDataWithComplexTypes(
+    intField: Int,
+    stringField: String,
+    structField: StructContainer,
+    arrayField: Seq[Int])
+
+case class ParquetDataWithKeyAndComplexTypes(
+    p: Int,
+    intField: Int,
+    stringField: String,
+    structField: StructContainer,
+    arrayField: Seq[Int])
 
 /**
  * A suite to test the automatic conversion of metastore tables with parquet data to use the
@@ -86,6 +100,38 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
       location '${new File(normalTableDir, "normal").getCanonicalPath}'
     """)
 
+    sql(s"""
+      CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
+      (
+        intField INT,
+        stringField STRING,
+        structField STRUCT<intStructField: INT, stringStructField: STRING>,
+        arrayField ARRAY<INT>
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+    """)
+
+    sql(s"""
+      CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
+      (
+        intField INT,
+        stringField STRING,
+        structField STRUCT<intStructField: INT, stringStructField: STRING>,
+        arrayField ARRAY<INT>
+      )
+      PARTITIONED BY (p int)
+      ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+       STORED AS
+       INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+       OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+      LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+    """)
+
     (1 to 10).foreach { p =>
       sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
     }
@@ -94,7 +140,15 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
       sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
     }
 
-    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_key_and_complextypes ADD PARTITION (p=$p)")
+    }
+
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_complextypes ADD PARTITION (p=$p)")
+    }
+
+    val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
     jsonRDD(rdd1).registerTempTable("jt")
     val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
     jsonRDD(rdd2).registerTempTable("jt_array")
@@ -105,6 +159,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
   override def afterAll(): Unit = {
     sql("DROP TABLE partitioned_parquet")
     sql("DROP TABLE partitioned_parquet_with_key")
+    sql("DROP TABLE partitioned_parquet_with_complextypes")
+    sql("DROP TABLE partitioned_parquet_with_key_and_complextypes")
     sql("DROP TABLE normal_parquet")
     sql("DROP TABLE IF EXISTS jt")
     sql("DROP TABLE IF EXISTS jt_array")
@@ -409,6 +465,22 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
         path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
       )
     """)
+
+    sql( s"""
+      CREATE TEMPORARY TABLE partitioned_parquet_with_key_and_complextypes
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+      )
+    """)
+
+    sql( s"""
+      CREATE TEMPORARY TABLE partitioned_parquet_with_complextypes
+      USING org.apache.spark.sql.parquet
+      OPTIONS (
+        path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+      )
+    """)
   }
 
   test("SPARK-6016 make sure to use the latest footers") {
@@ -473,7 +545,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
   var partitionedTableDir: File = null
   var normalTableDir: File = null
   var partitionedTableDirWithKey: File = null
-
+  var partitionedTableDirWithComplexTypes: File = null
+  var partitionedTableDirWithKeyAndComplexTypes: File = null
 
   override def beforeAll(): Unit = {
     partitionedTableDir = File.createTempFile("parquettests", "sparksql")
@@ -509,9 +582,45 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
         .toDF()
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
+
+    partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
+    partitionedTableDirWithKeyAndComplexTypes.delete()
+    partitionedTableDirWithKeyAndComplexTypes.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
+      sparkContext.makeRDD(1 to 10).map { i =>
+        ParquetDataWithKeyAndComplexTypes(
+          p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+      }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
+    partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
+    partitionedTableDirWithComplexTypes.delete()
+    partitionedTableDirWithComplexTypes.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
+      sparkContext.makeRDD(1 to 10).map { i =>
+        ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
+      }.toDF().saveAsParquetFile(partDir.getCanonicalPath)
+    }
+  }
+
+  override protected def afterAll(): Unit = {
+    partitionedTableDir.delete()
+    normalTableDir.delete()
+    partitionedTableDirWithKey.delete()
+    partitionedTableDirWithComplexTypes.delete()
+    partitionedTableDirWithKeyAndComplexTypes.delete()
   }
 
-  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+  Seq(
+    "partitioned_parquet",
+    "partitioned_parquet_with_key",
+    "partitioned_parquet_with_complextypes",
+    "partitioned_parquet_with_key_and_complextypes").foreach { table =>
+
     test(s"ordering of the partitioning columns $table") {
       checkAnswer(
         sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
@@ -601,6 +710,25 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
     }
   }
 
+  Seq(
+    "partitioned_parquet_with_key_and_complextypes",
+    "partitioned_parquet_with_complextypes").foreach { table =>
+
+    test(s"SPARK-5775 read struct from $table") {
+      checkAnswer(
+        sql(s"SELECT p, structField.intStructField, structField.stringStructField FROM $table WHERE p = 1"),
+        (1 to 10).map(i => Row(1, i, f"${i}_string")))
+    }
+
+    // Re-enable this after SPARK-5508 is fixed
+    ignore(s"SPARK-5775 read array from $table") {
+      checkAnswer(
+        sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
+        (1 to 10).map(i => Row(1 to i, 1)))
+    }
+  }
+
+
   test("non-part select(*)") {
     checkAnswer(
       sql("SELECT COUNT(*) FROM normal_parquet"),