diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index da0c92864e9bd1dd006934811bce3294741c8270..c9e7e7fe633b8a1736007d50b1858e1afb12e56e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -424,13 +424,13 @@ object StructType extends AbstractDataType {
         if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) {
           DecimalType(leftPrecision, leftScale)
         } else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) {
-          throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+          throw new SparkException("Failed to merge decimal types with incompatible " +
             s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale")
         } else if (leftPrecision != rightPrecision) {
-          throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+          throw new SparkException("Failed to merge decimal types with incompatible " +
             s"precision $leftPrecision and $rightPrecision")
         } else {
-          throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
+          throw new SparkException("Failed to merge decimal types with incompatible " +
             s"scala $leftScale and $rightScale")
         }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index f87590095d3441e6e681df1adc14d57a597d2a95..1e686d41f41db477621acf1e54c686999f0535b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -800,12 +800,37 @@ private[sql] object ParquetRelation extends Logging {
               assumeInt96IsTimestamp = assumeInt96IsTimestamp,
               writeLegacyParquetFormat = writeLegacyParquetFormat)
 
-          footers.map { footer =>
-            ParquetRelation.readSchemaFromFooter(footer, converter)
-          }.reduceLeftOption(_ merge _).iterator
+          if (footers.isEmpty) {
+            Iterator.empty
+          } else {
+            var mergedSchema = ParquetRelation.readSchemaFromFooter(footers.head, converter)
+            footers.tail.foreach { footer =>
+              val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
+              try {
+                mergedSchema = mergedSchema.merge(schema)
+              } catch { case cause: SparkException =>
+                throw new SparkException(
+                  s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
+              }
+            }
+            Iterator.single(mergedSchema)
+          }
         }.collect()
 
-    partiallyMergedSchemas.reduceLeftOption(_ merge _)
+    if (partiallyMergedSchemas.isEmpty) {
+      None
+    } else {
+      var finalSchema = partiallyMergedSchemas.head
+      partiallyMergedSchemas.tail.foreach { schema =>
+        try {
+          finalSchema = finalSchema.merge(schema)
+        } catch { case cause: SparkException =>
+          throw new SparkException(
+            s"Failed merging schema:\n${schema.treeString}", cause)
+        }
+      }
+      Some(finalSchema)
+    }
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 1796b3af0e37aa24a9bcfda96a2603120d35d4bd..3ded32c450541c6ff47636a336d07dc18460119c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -421,6 +421,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         // We will remove the temporary metadata when writing Parquet file.
         val forPathSix = sqlContext.read.parquet(pathSix).schema
         assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
+
+        // sanity test: make sure optional metadata field is not wrongly set.
+        val pathSeven = s"${dir.getCanonicalPath}/table7"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
+        val pathEight = s"${dir.getCanonicalPath}/table8"
+        (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)
+
+        val df2 = sqlContext.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
+        checkAnswer(
+          df2,
+          Row(1, "1"))
+
+        // The fields "a" and "b" exist in both two Parquet files. No metadata is set.
+        assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
+        assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 60fa81b1ab819fb07e876556f4eb84a6a9b163c6..d860651d421f8022c7d8e509a6bcc9b0cff787f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.parquet.schema.MessageTypeParser
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -449,6 +450,35 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     }.getMessage.contains("detected conflicting schemas"))
   }
 
+  test("schema merging failure error message") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext.range(3).write.parquet(s"$path/p=1")
+      sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
+
+      val message = intercept[SparkException] {
+        sqlContext.read.option("mergeSchema", "true").parquet(path).schema
+      }.getMessage
+
+      assert(message.contains("Failed merging schema of file"))
+    }
+
+    // test for second merging (after read Parquet schema in parallel done)
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      sqlContext.range(3).write.parquet(s"$path/p=1")
+      sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
+
+      sqlContext.sparkContext.conf.set("spark.default.parallelism", "20")
+
+      val message = intercept[SparkException] {
+        sqlContext.read.option("mergeSchema", "true").parquet(path).schema
+      }.getMessage
+
+      assert(message.contains("Failed merging schema:"))
+    }
+  }
+
   // =======================================================
   // Tests for converting Parquet LIST to Catalyst ArrayType
   // =======================================================