diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 0a6bb44445f6e0e003005d5d1bf7e48772f9f2c6..dc4ff06df6f22d44aab694f3818a35c09f31ffd9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, mapAsScalaMapConverter}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
 import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.Type.Repetition
+import org.apache.parquet.schema._
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
   // Called after `init()` when initializing Parquet record reader.
@@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
     // `StructType` containing all requested columns.
     val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
 
-    // Below we construct a Parquet schema containing all requested columns.  This schema tells
-    // Parquet which columns to read.
-    //
-    // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema.  Otherwise,
-    // we have to fallback to the full file schema which contains all columns in the file.
-    // Obviously this may waste IO bandwidth since it may read more columns than requested.
-    //
-    // Two things to note:
-    //
-    // 1. It's possible that some requested columns don't exist in the target Parquet file.  For
-    //    example, in the case of schema merging, the globally merged schema may contain extra
-    //    columns gathered from other Parquet files.  These columns will be simply filled with nulls
-    //    when actually reading the target Parquet file.
-    //
-    // 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
-    //    Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
-    //    non-standard behaviors of some Parquet libraries/tools.  For example, a Parquet file
-    //    containing a single integer array field `f1` may have the following legacy 2-level
-    //    structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          required INT32 element;
-    //        }
-    //      }
-    //
-    //    while `CatalystSchemaConverter` may generate a standard 3-level structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          repeated group list {
-    //            required INT32 element;
-    //          }
-    //        }
-    //      }
-    //
-    //    Apparently, we can't use the 2nd schema to read the target Parquet file as they have
-    //    different physical structures.
     val parquetRequestedSchema =
       maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
-        val toParquet = new CatalystSchemaConverter(conf)
-        val fileSchema = context.getFileSchema.asGroupType()
-        val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet
-
-        StructType
-          // Deserializes the Catalyst schema of requested columns
-          .fromString(schemaString)
-          .map { field =>
-            if (fileFieldNames.contains(field.name)) {
-              // If the field exists in the target Parquet file, extracts the field type from the
-              // full file schema and makes a single-field Parquet schema
-              new MessageType("root", fileSchema.getType(field.name))
-            } else {
-              // Otherwise, just resorts to `CatalystSchemaConverter`
-              toParquet.convert(StructType(Array(field)))
-            }
-          }
-          // Merges all single-field Parquet schemas to form a complete schema for all requested
-          // columns.  Note that it's possible that no columns are requested at all (e.g., count
-          // some partition column of a partitioned Parquet table). That's why `fold` is used here
-          // and always fallback to an empty Parquet schema.
-          .fold(new MessageType("root")) {
-            _ union _
-          }
+        val catalystRequestedSchema = StructType.fromString(schemaString)
+        CatalystReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
       }
 
     val metadata =
@@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport {
   val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
 
   val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  /**
+   * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
+   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
+   */
+  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
+    val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
+    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+  }
+
+  private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
+    catalystType match {
+      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
+        // Only clips array types with nested type as element type.
+        clipParquetListType(parquetType.asGroupType(), t.elementType)
+
+      case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
+        // Only clips map types with nested type as value type.
+        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
+
+      case t: StructType =>
+        clipParquetGroup(parquetType.asGroupType(), t)
+
+      case _ =>
+        parquetType
+    }
+  }
+
+  /**
+   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is not equivalent to
+   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but it's not an
+   * [[AtomicType]].
+   */
+  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: ArrayType | _: MapType | _: StructType => false
+      case _ => true
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[ArrayType]].  The element type
+   * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
+   * [[StructType]].
+   */
+  private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
+    // Precondition of this method, should only be called for lists with nested element types.
+    assert(!isPrimitiveCatalystType(elementType))
+
+    // Unannotated repeated group should be interpreted as required list of required element, so
+    // list element type is just the group itself.  Clip it.
+    if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
+      clipParquetType(parquetList, elementType)
+    } else {
+      assert(
+        parquetList.getOriginalType == OriginalType.LIST,
+        "Invalid Parquet schema. " +
+          "Original type of annotated Parquet lists must be LIST: " +
+          parquetList.toString)
+
+      assert(
+        parquetList.getFieldCount == 1 && parquetList.getType(0).isRepetition(Repetition.REPEATED),
+        "Invalid Parquet schema. " +
+          "LIST-annotated group should only have exactly one repeated field: " +
+          parquetList)
+
+      // Precondition of this method, should only be called for lists with nested element types.
+      assert(!parquetList.getType(0).isPrimitive)
+
+      val repeatedGroup = parquetList.getType(0).asGroupType()
+
+      // If the repeated field is a group with multiple fields, or the repeated field is a group
+      // with one field and is named either "array" or uses the LIST-annotated group's name with
+      // "_tuple" appended then the repeated type is the element type and elements are required.
+      // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
+      // only field.
+      if (
+        repeatedGroup.getFieldCount > 1 ||
+        repeatedGroup.getName == "array" ||
+        repeatedGroup.getName == parquetList.getName + "_tuple"
+      ) {
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(clipParquetType(repeatedGroup, elementType))
+          .named(parquetList.getName)
+      } else {
+        // Otherwise, the repeated field's type is the element type with the repeated field's
+        // repetition.
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
+              .named(repeatedGroup.getName))
+          .named(parquetList.getName)
+      }
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[MapType]].  The value type
+   * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
+   * [[StructType]].  Note that key type of any [[MapType]] is always a primitive type.
+   */
+  private def clipParquetMapType(
+      parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
+    // Precondition of this method, should only be called for maps with nested value types.
+    assert(!isPrimitiveCatalystType(valueType))
+
+    val repeatedGroup = parquetMap.getType(0).asGroupType()
+    val parquetKeyType = repeatedGroup.getType(0)
+    val parquetValueType = repeatedGroup.getType(1)
+
+    val clippedRepeatedGroup =
+      Types
+        .repeatedGroup()
+        .as(repeatedGroup.getOriginalType)
+        .addField(parquetKeyType)
+        .addField(clipParquetType(parquetValueType, valueType))
+        .named(repeatedGroup.getName)
+
+    Types
+      .buildGroup(parquetMap.getRepetition)
+      .as(parquetMap.getOriginalType)
+      .addField(clippedRepeatedGroup)
+      .named(parquetMap.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
+   *
+   * @return A clipped [[GroupType]], which has at least one field.
+   * @note Parquet doesn't allow creating empty [[GroupType]] instances except for empty
+   *       [[MessageType]].  Because it's legal to construct an empty requested schema for column
+   *       pruning.
+   */
+  private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
+    val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
+    Types
+      .buildGroup(parquetRecord.getRepetition)
+      .as(parquetRecord.getOriginalType)
+      .addFields(clippedParquetFields: _*)
+      .named(parquetRecord.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst [[StructType]].
+   *
+   * @return A list of clipped [[GroupType]] fields, which can be empty.
+   */
+  private def clipParquetGroupFields(
+      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
+    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
+    val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
+    structType.map { f =>
+      parquetFieldMap
+        .get(f.name)
+        .map(clipParquetType(_, f.dataType))
+        .getOrElse(toParquet.convertField(f))
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index fe13dfbbed385af4538798ecea07b616fe95d4f7..f17e794b76650174c215fc1550ad7d5a82cf5c27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
  * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
  * any "parent" container.
  *
- * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the
- *       Parquet file being read, while constructor argument [[catalystType]] refers to requested
- *       fields of the global schema.  The key difference is that, in case of schema merging,
- *       [[parquetType]] can be a subset of [[catalystType]].  For example, it's possible to have
- *       the following [[catalystType]]:
- *       {{{
- *         new StructType()
- *           .add("f1", IntegerType, nullable = false)
- *           .add("f2", StringType, nullable = true)
- *           .add("f3", new StructType()
- *             .add("f31", DoubleType, nullable = false)
- *             .add("f32", IntegerType, nullable = true)
- *             .add("f33", StringType, nullable = true), nullable = false)
- *       }}}
- *       and the following [[parquetType]] (`f2` and `f32` are missing):
- *       {{{
- *         message root {
- *           required int32 f1;
- *           required group f3 {
- *             required double f31;
- *             optional binary f33 (utf8);
- *           }
- *         }
- *       }}}
- *
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record type
  * @param updater An updater which propagates converted field values to the parent container
@@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
-    // In case of schema merging, `parquetType` can be a subset of `catalystType`.  We need to pad
-    // those missing fields and create converters for them, although values of these fields are
-    // always null.
-    val paddedParquetFields = {
-      val parquetFields = parquetType.getFields.asScala
-      val parquetFieldNames = parquetFields.map(_.getName).toSet
-      val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name))
-
-      // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when
-      // creating the schema converter here, since values of missing fields are always null.
-      val toParquet = new CatalystSchemaConverter()
-
-      (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f =>
-        catalystType.indexWhere(_.name == f.getName)
-      }
-    }
-
-    if (paddedParquetFields.length != catalystType.length) {
-      throw new UnsupportedOperationException(
-        "A Parquet file's schema has different number of fields with the table schema. " +
-          "Please enable schema merging by setting \"mergeSchema\" to true when load " +
-          "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in SQLConf.")
-    }
-
-    paddedParquetFields.zip(catalystType).zipWithIndex.map {
+    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
       case ((parquetFieldType, catalystField), ordinal) =>
         // Converted field value should be set to the `ordinal`-th cell of `currentRow`
         newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index be6c0545f5a0a80457b24ad35d67f6ccadf7e941..a21ab1dbb25d1b8d33764b9d151d9248a18b1271 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
  *        to old style non-standard behaviors.
  */
 private[parquet] class CatalystSchemaConverter(
-    private val assumeBinaryIsString: Boolean,
-    private val assumeInt96IsTimestamp: Boolean,
-    private val followParquetFormatSpec: Boolean) {
-
-  // Only used when constructing converter for converting Spark SQL schema to Parquet schema, in
-  // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are irrelevant.
-  def this() = this(
-    assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
-    assumeInt96IsTimestamp = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
-    followParquetFormatSpec = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+    assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
+) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index bd7cf8c10abefe0cc623964c4601aa68fb66a3d6..36b929ee1f409ebd1213650ce422d6e795a17ac4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.io.File
 import java.nio.ByteBuffer
 import java.util.{List => JList, Map => JMap}
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..83b65fb419ed328cb396df990dbeff0b34bb0427
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSQLContext {
+  test("parquet files with different physical schemas but share the same logical schema") {
+    import ParquetCompatibilityTest._
+
+    // This test case writes two Parquet files, both representing the following Catalyst schema
+    //
+    //   StructType(
+    //     StructField(
+    //       "f",
+    //       ArrayType(IntegerType, containsNull = false),
+    //       nullable = false))
+    //
+    // The first Parquet file comes with parquet-avro style 2-level LIST-annotated group, while the
+    // other one comes with parquet-protobuf style 1-level unannotated primitive field.
+    withTempDir { dir =>
+      val avroStylePath = new File(dir, "avro-style").getCanonicalPath
+      val protobufStylePath = new File(dir, "protobuf-style").getCanonicalPath
+
+      val avroStyleSchema =
+        """message avro_style {
+          |  required group f (LIST) {
+          |    repeated int32 array;
+          |  }
+          |}
+        """.stripMargin
+
+      writeDirect(avroStylePath, avroStyleSchema, { rc =>
+        rc.message {
+          rc.field("f", 0) {
+            rc.group {
+              rc.field("array", 0) {
+                rc.addInteger(0)
+                rc.addInteger(1)
+              }
+            }
+          }
+        }
+      })
+
+      logParquetSchema(avroStylePath)
+
+      val protobufStyleSchema =
+        """message protobuf_style {
+          |  repeated int32 f;
+          |}
+        """.stripMargin
+
+      writeDirect(protobufStylePath, protobufStyleSchema, { rc =>
+        rc.message {
+          rc.field("f", 0) {
+            rc.addInteger(2)
+            rc.addInteger(3)
+          }
+        }
+      })
+
+      logParquetSchema(protobufStylePath)
+
+      checkAnswer(
+        sqlContext.read.parquet(dir.getCanonicalPath),
+        Seq(
+          Row(Seq(0, 1)),
+          Row(Seq(2, 3))))
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b7b70c2bbbd5c01b5b354d6012370fd6b4106e70..a379523d67f80f333ef332e45158b5932e932cd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -229,4 +229,81 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       }
     }
   }
+
+  test("SPARK-10301 Clipping nested structs in requested schema") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+        .coalesce(1)
+
+      df.write.mode("append").parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s", new StructType().add("a", LongType, nullable = true), nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0)))
+    }
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df1 = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+        .coalesce(1)
+
+      val df2 = sqlContext
+        .range(1, 2)
+        .selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s")
+        .coalesce(1)
+
+      df1.write.parquet(path)
+      df2.write.mode(SaveMode.Append).parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s",
+          new StructType()
+            .add("a", LongType, nullable = true)
+            .add("c", LongType, nullable = true),
+          nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Seq(
+          Row(Row(0, null)),
+          Row(Row(null, 1))))
+    }
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) AS s")
+        .coalesce(1)
+
+      df.write.parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s",
+          new StructType()
+            .add(
+              "a",
+              ArrayType(
+                new StructType()
+                  .add("b", LongType, nullable = true)
+                  .add("d", StringType, nullable = true),
+                containsNull = true),
+              nullable = true),
+          nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(Seq(Row(0, null)))))
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 9dcbc1a047beac4398dfff4122e5b9938e0e6be2..28c59a4abdd7678c096c6bc339a3b9b9bd983752 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.parquet.schema.MessageTypeParser
 
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -941,4 +942,313 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
       |}
     """.stripMargin)
+
+  private def testSchemaClipping(
+      testName: String,
+      parquetSchema: String,
+      catalystSchema: StructType,
+      expectedSchema: String): Unit = {
+    test(s"Clipping - $testName") {
+      val expected = MessageTypeParser.parseMessageType(expectedSchema)
+      val actual = CatalystReadSupport.clipParquetSchema(
+        MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
+
+      try {
+        expected.checkContains(actual)
+        actual.checkContains(expected)
+      } catch { case cause: Throwable =>
+        fail(
+          s"""Expected clipped schema:
+             |$expected
+             |Actual clipped schema:
+             |$actual
+           """.stripMargin,
+          cause)
+      }
+    }
+  }
+
+  testSchemaClipping(
+    "simple nested struct",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional int32 f00;
+        |    optional int32 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f0Type = new StructType().add("f00", IntegerType, nullable = true)
+      new StructType()
+        .add("f0", f0Type, nullable = false)
+        .add("f1", IntegerType, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional int32 f00;
+        |  }
+        |  optional int32 f1;
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-protobuf style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    repeated binary f00 (UTF8);
+        |    repeated group f01 {
+        |      optional int32 f010;
+        |      optional double f011;
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f11Type = new StructType().add("f011", DoubleType, nullable = true)
+      val f01Type = ArrayType(StringType, containsNull = false)
+      val f0Type = new StructType()
+        .add("f00", f01Type, nullable = false)
+        .add("f01", f11Type, nullable = false)
+      val f1Type = ArrayType(IntegerType, containsNull = true)
+      new StructType()
+        .add("f0", f0Type, nullable = false)
+        .add("f1", f1Type, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    repeated binary f00 (UTF8);
+        |    repeated group f01 {
+        |      optional double f011;
+        |    }
+        |  }
+        |
+        |  optional group f1 (LIST) {
+        |    repeated group list {
+        |      optional int32 element;
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-thrift style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary f00_tuple (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group f01_tuple {
+        |        optional int32 f010;
+        |        optional double f011;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f11ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = false)
+        .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary f00_tuple (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group f01_tuple {
+        |        optional double f011;
+        |        optional int64 f012;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-avro style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary array (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group array {
+        |        optional int32 f010;
+        |        optional double f011;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f11ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = false)
+        .add("f01", ArrayType(f11ElementType, containsNull = false), nullable = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary array (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group array {
+        |        optional double f011;
+        |        optional int64 f012;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-hive style array",
+
+    parquetSchema =
+      """message root {
+        |  optional group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group bag {
+        |        optional binary array_element;
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group bag {
+        |        optional group array_element {
+        |          optional int32 f010;
+        |          optional double f011;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f01ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = true), nullable = true)
+        .add("f01", ArrayType(f01ElementType, containsNull = true), nullable = true)
+
+      new StructType().add("f0", f0Type, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  optional group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group bag {
+        |        optional binary array_element;
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group bag {
+        |        optional group array_element {
+        |          optional double f011;
+        |          optional int64 f012;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "2-level list of required struct",
+
+    parquetSchema =
+      s"""message root {
+         |  required group f0 {
+         |    required group f00 (LIST) {
+         |      repeated group element {
+         |        required int32 f000;
+         |        optional int64 f001;
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin,
+
+    catalystSchema = {
+      val f00ElementType =
+        new StructType()
+          .add("f001", LongType, nullable = true)
+          .add("f002", DoubleType, nullable = false)
+
+      val f00Type = ArrayType(f00ElementType, containsNull = false)
+      val f0Type = new StructType().add("f00", f00Type, nullable = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      s"""message root {
+         |  required group f0 {
+         |    required group f00 (LIST) {
+         |      repeated group element {
+         |        optional int64 f001;
+         |        required double f002;
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin)
+
+  testSchemaClipping(
+    "empty requested schema",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    required int32 f00;
+        |    required int64 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = new StructType(),
+
+    expectedSchema = "message root {}")
 }