diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 909b8e31f24580947a8fbffc3ca88bd2a5badcf0..c11dab35cdf6f26aacef6341b1cf965eaa94d281 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -108,7 +108,9 @@ final class Decimal extends Ordered[Decimal] with Serializable {
    */
   def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
     this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
-    require(decimalVal.precision <= precision, "Overflowed precision")
+    require(
+      decimalVal.precision <= precision,
+      s"Decimal precision ${decimalVal.precision} exceeds max precision $precision")
     this.longVal = 0L
     this._precision = precision
     this._scale = scale
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 8f0f8910b36ab2751de2ff55a74666768a803e4d..47397c4be3cb69fa650e6e48404fcd57e8b1285f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -292,10 +292,9 @@ private[spark] object SQLConf {
 
   val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
     key = "spark.sql.parquet.writeLegacyFormat",
-    defaultValue = Some(true),
+    defaultValue = Some(false),
     doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
-      "Spark SQL schema and vice versa.",
-    isPublic = false)
+      "Spark SQL schema and vice versa.")
 
   val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
     key = "spark.sql.parquet.output.committer.class",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 532569803409507e943a489d86ce2886fbf586b6..a958373eb769d81a3376f88fe4446fd51ad0a8f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -95,7 +95,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
        """.stripMargin
     }
 
-    new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
+    new CatalystRecordMaterializer(
+      parquetRequestedSchema,
+      CatalystReadSupport.expandUDT(catalystRequestedSchema))
   }
 }
 
@@ -110,7 +112,10 @@ private[parquet] object CatalystReadSupport {
    */
   def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
     val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
-    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+    Types
+      .buildMessage()
+      .addFields(clippedParquetFields: _*)
+      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
   }
 
   private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
@@ -271,4 +276,30 @@ private[parquet] object CatalystReadSupport {
         .getOrElse(toParquet.convertField(f))
     }
   }
+
+  def expandUDT(schema: StructType): StructType = {
+    def expand(dataType: DataType): DataType = {
+      dataType match {
+        case t: ArrayType =>
+          t.copy(elementType = expand(t.elementType))
+
+        case t: MapType =>
+          t.copy(
+            keyType = expand(t.keyType),
+            valueType = expand(t.valueType))
+
+        case t: StructType =>
+          val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType)))
+          t.copy(fields = expandedFields)
+
+        case t: UserDefinedType[_] =>
+          t.sqlType
+
+        case t =>
+          t
+      }
+    }
+
+    expand(schema).asInstanceOf[StructType]
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 050d3610a6413e68d7d363dfd98721b8391e72b7..247d35363b862739c2a38c048a30912956a8d890 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -27,7 +27,6 @@ import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
 import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
-import org.apache.parquet.schema.Type.Repetition
 import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
 
 import org.apache.spark.Logging
@@ -114,7 +113,8 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp
  * any "parent" container.
  *
  * @param parquetType Parquet schema of Parquet records
- * @param catalystType Spark SQL schema that corresponds to the Parquet record type
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
+ *        types should have been expanded.
  * @param updater An updater which propagates converted field values to the parent container
  */
 private[parquet] class CatalystRowConverter(
@@ -133,6 +133,12 @@ private[parquet] class CatalystRowConverter(
        |${catalystType.prettyJson}
      """.stripMargin)
 
+  assert(
+    !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]),
+    s"""User-defined types in Catalyst schema should have already been expanded:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
   logDebug(
     s"""Building row converter for the following schema:
        |
@@ -268,13 +274,6 @@ private[parquet] class CatalystRowConverter(
           override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
         })
 
-      case t: UserDefinedType[_] =>
-        val catalystTypeForUDT = t.sqlType
-        val nullable = parquetType.isRepetition(Repetition.OPTIONAL)
-        val field = StructField("udt", catalystTypeForUDT, nullable)
-        val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field)
-        newConverter(parquetTypeForUDT, catalystTypeForUDT, updater)
-
       case _ =>
         throw new RuntimeException(
           s"Unable to create Parquet converter for data type ${catalystType.json}")
@@ -340,30 +339,36 @@ private[parquet] class CatalystRowConverter(
       val scale = decimalType.scale
 
       if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) {
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.  The underlying
-        // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using
-        // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it.
-        val buffer = value.toByteBuffer
-        val bytes = buffer.array()
-        val start = buffer.position()
-        val end = buffer.limit()
-
-        var unscaled = 0L
-        var i = start
-
-        while (i < end) {
-          unscaled = (unscaled << 8) | (bytes(i) & 0xff)
-          i += 1
-        }
-
-        val bits = 8 * (end - start)
-        unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+        // Constructs a `Decimal` with an unscaled `Long` value if possible.
+        val unscaled = binaryToUnscaledLong(value)
         Decimal(unscaled, precision, scale)
       } else {
         // Otherwise, resorts to an unscaled `BigInteger` instead.
         Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale)
       }
     }
+
+    private def binaryToUnscaledLong(binary: Binary): Long = {
+      // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
+      // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
+      // copying it.
+      val buffer = binary.toByteBuffer
+      val bytes = buffer.array()
+      val start = buffer.position()
+      val end = buffer.limit()
+
+      var unscaled = 0L
+      var i = start
+
+      while (i < end) {
+        unscaled = (unscaled << 8) | (bytes(i) & 0xff)
+        i += 1
+      }
+
+      val bits = 8 * (end - start)
+      unscaled = (unscaled << (64 - bits)) >> (64 - bits)
+      unscaled
+    }
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 6904fc736c1062a35359da0280ec2e05fe025507..7f3394c20ed3d410080467c701b5ce5f865eaea0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -121,7 +121,7 @@ private[parquet] class CatalystSchemaConverter(
       val precision = field.getDecimalMetadata.getPrecision
       val scale = field.getDecimalMetadata.getScale
 
-      CatalystSchemaConverter.analysisRequire(
+      CatalystSchemaConverter.checkConversionRequirement(
         maxPrecision == -1 || 1 <= precision && precision <= maxPrecision,
         s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)")
 
@@ -155,7 +155,7 @@ private[parquet] class CatalystSchemaConverter(
         }
 
       case INT96 =>
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           assumeInt96IsTimestamp,
           "INT96 is not supported unless it's interpreted as timestamp. " +
             s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
@@ -197,11 +197,11 @@ private[parquet] class CatalystSchemaConverter(
       //
       // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
       case LIST =>
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1, s"Invalid list type $field")
 
         val repeatedType = field.getType(0)
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           repeatedType.isRepetition(REPEATED), s"Invalid list type $field")
 
         if (isElementType(repeatedType, field.getName)) {
@@ -217,17 +217,17 @@ private[parquet] class CatalystSchemaConverter(
       // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
       // scalastyle:on
       case MAP | MAP_KEY_VALUE =>
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           field.getFieldCount == 1 && !field.getType(0).isPrimitive,
           s"Invalid map type: $field")
 
         val keyValueType = field.getType(0).asGroupType()
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2,
           s"Invalid map type: $field")
 
         val keyType = keyValueType.getType(0)
-        CatalystSchemaConverter.analysisRequire(
+        CatalystSchemaConverter.checkConversionRequirement(
           keyType.isPrimitive,
           s"Map key type is expected to be a primitive type, but found: $keyType")
 
@@ -299,7 +299,10 @@ private[parquet] class CatalystSchemaConverter(
    * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
    */
   def convert(catalystSchema: StructType): MessageType = {
-    Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
+    Types
+      .buildMessage()
+      .addFields(catalystSchema.map(convertField): _*)
+      .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
   }
 
   /**
@@ -347,10 +350,10 @@ private[parquet] class CatalystSchemaConverter(
       // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
       //
       // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
-      // timestamp in Impala for some historical reasons, it's not recommended to be used for any
-      // other types and will probably be deprecated in future Parquet format spec.  That's the
-      // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
-      // are both logical types annotating `INT64`.
+      // timestamp in Impala for some historical reasons.  It's not recommended to be used for any
+      // other types and will probably be deprecated in some future version of parquet-format spec.
+      // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
+      // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
       //
       // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive.  Starting
       // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
@@ -361,7 +364,7 @@ private[parquet] class CatalystSchemaConverter(
       // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
       // hasn't implemented `TIMESTAMP_MICROS` yet.
       //
-      // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
+      // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that.
       case TimestampType =>
         Types.primitive(INT96, repetition).named(field.name)
 
@@ -523,11 +526,12 @@ private[parquet] class CatalystSchemaConverter(
   }
 }
 
-
 private[parquet] object CatalystSchemaConverter {
+  val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
+
   def checkFieldName(name: String): Unit = {
     // ,;{}()\n\t= and space are special characters in Parquet schema
-    analysisRequire(
+    checkConversionRequirement(
       !name.matches(".*[ ,;{}()\n\t=].*"),
       s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
          |Please use alias to rename it.
@@ -539,7 +543,7 @@ private[parquet] object CatalystSchemaConverter {
     schema
   }
 
-  def analysisRequire(f: => Boolean, message: String): Unit = {
+  def checkConversionRequirement(f: => Boolean, message: String): Unit = {
     if (!f) {
       throw new AnalysisException(message)
     }
@@ -553,16 +557,8 @@ private[parquet] object CatalystSchemaConverter {
     numBytes
   }
 
-  private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision)
-
   // Returns the minimum number of bytes needed to store a decimal with a given `precision`.
-  def minBytesForPrecision(precision : Int) : Int = {
-    if (precision < MIN_BYTES_FOR_PRECISION.length) {
-      MIN_BYTES_FOR_PRECISION(precision)
-    } else {
-      computeMinBytesForPrecision(precision)
-    }
-  }
+  val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision)
 
   val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
new file mode 100644
index 0000000000000000000000000000000000000000..483363d2c1a211dd5797a394fb6b33e8cd67df3b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.util
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.column.ParquetProperties
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.{Binary, RecordConsumer}
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, minBytesForPrecision}
+import org.apache.spark.sql.types._
+
+/**
+ * A Parquet [[WriteSupport]] implementation that writes Catalyst [[InternalRow]]s as Parquet
+ * messages.  This class can write Parquet data in two modes:
+ *
+ *  - Standard mode: Parquet data are written in standard format defined in parquet-format spec.
+ *  - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior.
+ *
+ * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyFormat`.  The value
+ * of this option is propagated to this class by the `init()` method and its Hadoop configuration
+ * argument.
+ */
+private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging {
+  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer.
+  // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access
+  // data in `ArrayData` without the help of `SpecificMutableRow`.
+  private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+  // Schema of the `InternalRow`s to be written
+  private var schema: StructType = _
+
+  // `ValueWriter`s for all fields of the schema
+  private var rootFieldWriters: Seq[ValueWriter] = _
+
+  // The Parquet `RecordConsumer` to which all `InternalRow`s are written
+  private var recordConsumer: RecordConsumer = _
+
+  // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions
+  private var writeLegacyParquetFormat: Boolean = _
+
+  // Reusable byte array used to write timestamps as Parquet INT96 values
+  private val timestampBuffer = new Array[Byte](12)
+
+  // Reusable byte array used to write decimal values
+  private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))
+
+  override def init(configuration: Configuration): WriteContext = {
+    val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA)
+    this.schema = StructType.fromString(schemaString)
+    this.writeLegacyParquetFormat = {
+      // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation
+      assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null)
+      configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean
+    }
+    this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
+
+    val messageType = new CatalystSchemaConverter(configuration).convert(schema)
+    val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
+
+    logInfo(
+      s"""Initialized Parquet WriteSupport with Catalyst schema:
+         |${schema.prettyJson}
+         |and corresponding Parquet message type:
+         |$messageType
+       """.stripMargin)
+
+    new WriteContext(messageType, metadata)
+  }
+
+  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+    this.recordConsumer = recordConsumer
+  }
+
+  override def write(row: InternalRow): Unit = {
+    consumeMessage {
+      writeFields(row, schema, rootFieldWriters)
+    }
+  }
+
+  private def writeFields(
+      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = {
+    var i = 0
+    while (i < row.numFields) {
+      if (!row.isNullAt(i)) {
+        consumeField(schema(i).name, i) {
+          fieldWriters(i).apply(row, i)
+        }
+      }
+      i += 1
+    }
+  }
+
+  private def makeWriter(dataType: DataType): ValueWriter = {
+    dataType match {
+      case BooleanType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBoolean(row.getBoolean(ordinal))
+
+      case ByteType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getByte(ordinal))
+
+      case ShortType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getShort(ordinal))
+
+      case IntegerType | DateType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addInteger(row.getInt(ordinal))
+
+      case LongType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addLong(row.getLong(ordinal))
+
+      case FloatType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addFloat(row.getFloat(ordinal))
+
+      case DoubleType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addDouble(row.getDouble(ordinal))
+
+      case StringType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes))
+
+      case TimestampType =>
+        (row: SpecializedGetters, ordinal: Int) => {
+          // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it
+          // Currently we only support timestamps stored as INT96, which is compatible with Hive
+          // and Impala.  However, INT96 is to be deprecated.  We plan to support `TIMESTAMP_MICROS`
+          // defined in the parquet-format spec.  But up until writing, the most recent parquet-mr
+          // version (1.8.1) hasn't implemented it yet.
+
+          // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
+          // precision.  Nanosecond parts of timestamp values read from INT96 are simply stripped.
+          val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
+          val buf = ByteBuffer.wrap(timestampBuffer)
+          buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
+          recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer))
+        }
+
+      case BinaryType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal)))
+
+      case DecimalType.Fixed(precision, scale) =>
+        makeDecimalWriter(precision, scale)
+
+      case t: StructType =>
+        val fieldWriters = t.map(_.dataType).map(makeWriter)
+        (row: SpecializedGetters, ordinal: Int) =>
+          consumeGroup {
+            writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)
+          }
+
+      case t: ArrayType => makeArrayWriter(t)
+
+      case t: MapType => makeMapWriter(t)
+
+      case t: UserDefinedType[_] => makeWriter(t.sqlType)
+
+      // TODO Adds IntervalType support
+      case _ => sys.error(s"Unsupported data type $dataType.")
+    }
+  }
+
+  private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = {
+    assert(
+      precision <= DecimalType.MAX_PRECISION,
+      s"Decimal precision $precision exceeds max precision ${DecimalType.MAX_PRECISION}")
+
+    val numBytes = minBytesForPrecision(precision)
+
+    val int32Writer =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        recordConsumer.addInteger(unscaledLong.toInt)
+      }
+
+    val int64Writer =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        recordConsumer.addLong(unscaledLong)
+      }
+
+    val binaryWriterUsingUnscaledLong =
+      (row: SpecializedGetters, ordinal: Int) => {
+        // When the precision is low enough (<= 18) to squeeze the decimal value into a `Long`, we
+        // can build a fixed-length byte array with length `numBytes` using the unscaled `Long`
+        // value and the `decimalBuffer` for better performance.
+        val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong
+        var i = 0
+        var shift = 8 * (numBytes - 1)
+
+        while (i < numBytes) {
+          decimalBuffer(i) = (unscaled >> shift).toByte
+          i += 1
+          shift -= 8
+        }
+
+        recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes))
+      }
+
+    val binaryWriterUsingUnscaledBytes =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val decimal = row.getDecimal(ordinal, precision, scale)
+        val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray
+        val fixedLengthBytes = if (bytes.length == numBytes) {
+          // If the length of the underlying byte array of the unscaled `BigInteger` happens to be
+          // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`.
+          bytes
+        } else {
+          // Otherwise, the length must be less than `numBytes`.  In this case we copy contents of
+          // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result
+          // fixed-length byte array.
+          val signByte = if (bytes.head < 0) -1: Byte else 0: Byte
+          util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte)
+          System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length)
+          decimalBuffer
+        }
+
+        recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes))
+      }
+
+    writeLegacyParquetFormat match {
+      // Standard mode, 1 <= precision <= 9, writes as INT32
+      case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer
+
+      // Standard mode, 10 <= precision <= 18, writes as INT64
+      case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer
+
+      // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
+      case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong
+
+      // Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
+      case _ => binaryWriterUsingUnscaledBytes
+    }
+  }
+
+  def makeArrayWriter(arrayType: ArrayType): ValueWriter = {
+    val elementWriter = makeWriter(arrayType.elementType)
+
+    def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val array = row.getArray(ordinal)
+        consumeGroup {
+          // Only creates the repeated field if the array is non-empty.
+          if (array.numElements() > 0) {
+            consumeField(repeatedGroupName, 0) {
+              var i = 0
+              while (i < array.numElements()) {
+                consumeGroup {
+                  // Only creates the element field if the current array element is not null.
+                  if (!array.isNullAt(i)) {
+                    consumeField(elementFieldName, 0) {
+                      elementWriter.apply(array, i)
+                    }
+                  }
+                }
+                i += 1
+              }
+            }
+          }
+        }
+      }
+
+    def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter =
+      (row: SpecializedGetters, ordinal: Int) => {
+        val array = row.getArray(ordinal)
+        consumeGroup {
+          // Only creates the repeated field if the array is non-empty.
+          if (array.numElements() > 0) {
+            consumeField(repeatedFieldName, 0) {
+              var i = 0
+              while (i < array.numElements()) {
+                elementWriter.apply(array, i)
+                i += 1
+              }
+            }
+          }
+        }
+      }
+
+    (writeLegacyParquetFormat, arrayType.containsNull) match {
+      case (legacyMode @ false, _) =>
+        // Standard mode:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     repeated group list {
+        //                    ^~~~  repeatedGroupName
+        //       <element-repetition> <element-type> element;
+        //                                           ^~~~~~~  elementFieldName
+        //     }
+        //   }
+        threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = "element")
+
+      case (legacyMode @ true, nullableElements @ true) =>
+        // Legacy mode, with nullable elements:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     optional group bag {
+        //                    ^~~  repeatedGroupName
+        //       repeated <element-type> array;
+        //                               ^~~~~ elementFieldName
+        //     }
+        //   }
+        threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = "array")
+
+      case (legacyMode @ true, nullableElements @ false) =>
+        // Legacy mode, with non-nullable elements:
+        //
+        //   <list-repetition> group <name> (LIST) {
+        //     repeated <element-type> array;
+        //                             ^~~~~  repeatedFieldName
+        //   }
+        twoLevelArrayWriter(repeatedFieldName = "array")
+    }
+  }
+
+  private def makeMapWriter(mapType: MapType): ValueWriter = {
+    val keyWriter = makeWriter(mapType.keyType)
+    val valueWriter = makeWriter(mapType.valueType)
+    val repeatedGroupName = if (writeLegacyParquetFormat) {
+      // Legacy mode:
+      //
+      //   <map-repetition> group <name> (MAP) {
+      //     repeated group map (MAP_KEY_VALUE) {
+      //                    ^~~  repeatedGroupName
+      //       required <key-type> key;
+      //       <value-repetition> <value-type> value;
+      //     }
+      //   }
+      "map"
+    } else {
+      // Standard mode:
+      //
+      //   <map-repetition> group <name> (MAP) {
+      //     repeated group key_value {
+      //                    ^~~~~~~~~  repeatedGroupName
+      //       required <key-type> key;
+      //       <value-repetition> <value-type> value;
+      //     }
+      //   }
+      "key_value"
+    }
+
+    (row: SpecializedGetters, ordinal: Int) => {
+      val map = row.getMap(ordinal)
+      val keyArray = map.keyArray()
+      val valueArray = map.valueArray()
+
+      consumeGroup {
+        // Only creates the repeated field if the map is non-empty.
+        if (map.numElements() > 0) {
+          consumeField(repeatedGroupName, 0) {
+            var i = 0
+            while (i < map.numElements()) {
+              consumeGroup {
+                consumeField("key", 0) {
+                  keyWriter.apply(keyArray, i)
+                }
+
+                // Only creates the "value" field if the value if non-empty
+                if (!map.valueArray().isNullAt(i)) {
+                  consumeField("value", 1) {
+                    valueWriter.apply(valueArray, i)
+                  }
+                }
+              }
+              i += 1
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private def consumeMessage(f: => Unit): Unit = {
+    recordConsumer.startMessage()
+    f
+    recordConsumer.endMessage()
+  }
+
+  private def consumeGroup(f: => Unit): Unit = {
+    recordConsumer.startGroup()
+    f
+    recordConsumer.endGroup()
+  }
+
+  private def consumeField(field: String, index: Int)(f: => Unit): Unit = {
+    recordConsumer.startField(field, index)
+    f
+    recordConsumer.endField(field, index)
+  }
+}
+
+private[parquet] object CatalystWriteSupport {
+  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
+
+  def setSchema(schema: StructType, configuration: Configuration): Unit = {
+    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
+    configuration.set(SPARK_ROW_SCHEMA, schema.json)
+    configuration.set(
+      ParquetOutputFormat.WRITER_VERSION,
+      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index de1fd0166ac5a2fe810f6c801a23949876552773..300e8677b312fad19f963f648a1758b85d1803d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
  *
  *   NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
  *   no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
- *   left * empty).
+ *   left empty).
  */
 private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
   extends ParquetOutputCommitter(outputPath, context) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
deleted file mode 100644
index ccd7ebf319af9ec45fe0d39aeb043407845d0034..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{MapData, ArrayData}
-
-// TODO Removes this while fixing SPARK-8848
-private[sql] object CatalystConverter {
-  // This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
-  // Note that "array" for the array elements is chosen by ParquetAvro.
-  // Using a different value will result in Parquet silently dropping columns.
-  val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
-  val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
-
-  val MAP_KEY_SCHEMA_NAME = "key"
-  val MAP_VALUE_SCHEMA_NAME = "value"
-  val MAP_SCHEMA_NAME = "map"
-
-  // TODO: consider using Array[T] for arrays to avoid boxing of primitive types
-  type ArrayScalaType = ArrayData
-  type StructScalaType = InternalRow
-  type MapScalaType = MapData
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index c6b3fe7900da8e22683a5f7e1ba2cd10ccc81905..78040d99fb0a5044f6a134656ee61255ab600d8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -18,24 +18,17 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.Serializable
-import java.nio.ByteBuffer
 
-import com.google.common.io.BaseEncoding
-import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.io.api.Binary
 import org.apache.parquet.schema.OriginalType
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 
-import org.apache.spark.SparkEnv
-import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types._
 
 private[sql] object ParquetFilters {
-  val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
-
   case class SetInFilter[T <: Comparable[T]](
     valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
 
@@ -282,33 +275,4 @@ private[sql] object ParquetFilters {
     addMethod.setAccessible(true)
     addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
   }
-
-  /**
-   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
-   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
-   * the actual filter predicate.
-   */
-  def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = {
-    if (filters.nonEmpty) {
-      val serialized: Array[Byte] =
-        SparkEnv.get.closureSerializer.newInstance().serialize(filters).array()
-      val encoded: String = BaseEncoding.base64().encode(serialized)
-      conf.set(PARQUET_FILTER_DATA, encoded)
-    }
-  }
-
-  /**
-   * Note: Inside the Hadoop API we only have access to `Configuration`, not to
-   * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
-   * the actual filter predicate.
-   */
-  def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = {
-    val data = conf.get(PARQUET_FILTER_DATA)
-    if (data != null) {
-      val decoded: Array[Byte] = BaseEncoding.base64().decode(data)
-      SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded))
-    } else {
-      Seq()
-    }
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 8a9c0e733a9a14a625dbf418425261a7aba4ed2e..77d851ca486b348b957caf30d4c92e0b27533860 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -218,8 +218,8 @@ private[sql] class ParquetRelation(
     }
 
     // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
-    val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
-    if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
+    val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
+    if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
       conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
         classOf[DirectParquetOutputCommitter].getCanonicalName)
     }
@@ -248,18 +248,22 @@ private[sql] class ParquetRelation(
     // bundled with `ParquetOutputFormat[Row]`.
     job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
 
-    // TODO There's no need to use two kinds of WriteSupport
-    // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
-    // complex types.
-    val writeSupportClass =
-      if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
-        classOf[MutableRowWriteSupport]
-      } else {
-        classOf[RowWriteSupport]
-      }
+    ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
+    CatalystWriteSupport.setSchema(dataSchema, conf)
+
+    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
+    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
+    conf.set(
+      SQLConf.PARQUET_BINARY_AS_STRING.key,
+      sqlContext.conf.isParquetBinaryAsString.toString)
 
-    ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
-    RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
+    conf.set(
+      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      sqlContext.conf.isParquetINT96AsTimestamp.toString)
+
+    conf.set(
+      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+      sqlContext.conf.writeLegacyParquetFormat.toString)
 
     // Sets compression scheme
     conf.set(
@@ -287,7 +291,6 @@ private[sql] class ParquetRelation(
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
-    val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
 
     // Parquet row group size. We will use this value as the value for
     // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
@@ -304,8 +307,7 @@ private[sql] class ParquetRelation(
         useMetadataCache,
         parquetFilterPushDown,
         assumeBinaryIsString,
-        assumeInt96IsTimestamp,
-        writeLegacyParquetFormat) _
+        assumeInt96IsTimestamp) _
 
     // Create the function to set input paths at the driver side.
     val setInputPaths =
@@ -530,8 +532,7 @@ private[sql] object ParquetRelation extends Logging {
       useMetadataCache: Boolean,
       parquetFilterPushDown: Boolean,
       assumeBinaryIsString: Boolean,
-      assumeInt96IsTimestamp: Boolean,
-      writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
+      assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
     val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
     conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
 
@@ -552,16 +553,15 @@ private[sql] object ParquetRelation extends Logging {
     })
 
     conf.set(
-      RowWriteSupport.SPARK_ROW_SCHEMA,
+      CatalystWriteSupport.SPARK_ROW_SCHEMA,
       CatalystSchemaConverter.checkFieldNames(dataSchema).json)
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
     conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
 
-    // Sets flags for Parquet schema conversion
+    // Sets flags for `CatalystSchemaConverter`
     conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
     conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
-    conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)
 
     overrideMinSplitSize(parquetBlockSize, conf)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
deleted file mode 100644
index ed89aa27aa1f0545e852e6770c7b0b4a12d69b10..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
+++ /dev/null
@@ -1,321 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.math.BigInteger
-import java.nio.{ByteBuffer, ByteOrder}
-import java.util.{HashMap => JHashMap}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.column.ParquetProperties
-import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.io.api._
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-
-/**
- * A `parquet.hadoop.api.WriteSupport` for Row objects.
- */
-private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging {
-
-  private[parquet] var writer: RecordConsumer = null
-  private[parquet] var attributes: Array[Attribute] = null
-
-  override def init(configuration: Configuration): WriteSupport.WriteContext = {
-    val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    val metadata = new JHashMap[String, String]()
-    metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr)
-
-    if (attributes == null) {
-      attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray
-    }
-
-    log.debug(s"write support initialized for requested schema $attributes")
-    new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
-  }
-
-  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
-    writer = recordConsumer
-    log.debug(s"preparing for write with schema $attributes")
-  }
-
-  override def write(record: InternalRow): Unit = {
-    val attributesSize = attributes.size
-    if (attributesSize > record.numFields) {
-      throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " +
-        s"($attributesSize > ${record.numFields})")
-    }
-
-    var index = 0
-    writer.startMessage()
-    while(index < attributesSize) {
-      // null values indicate optional fields but we do not check currently
-      if (!record.isNullAt(index)) {
-        writer.startField(attributes(index).name, index)
-        writeValue(attributes(index).dataType, record.get(index, attributes(index).dataType))
-        writer.endField(attributes(index).name, index)
-      }
-      index = index + 1
-    }
-    writer.endMessage()
-  }
-
-  private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
-    if (value != null) {
-      schema match {
-        case t: UserDefinedType[_] => writeValue(t.sqlType, value)
-        case t @ ArrayType(_, _) => writeArray(
-          t,
-          value.asInstanceOf[CatalystConverter.ArrayScalaType])
-        case t @ MapType(_, _, _) => writeMap(
-          t,
-          value.asInstanceOf[CatalystConverter.MapScalaType])
-        case t @ StructType(_) => writeStruct(
-          t,
-          value.asInstanceOf[CatalystConverter.StructScalaType])
-        case _ => writePrimitive(schema.asInstanceOf[AtomicType], value)
-      }
-    }
-  }
-
-  private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
-    if (value != null) {
-      schema match {
-        case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean])
-        case ByteType => writer.addInteger(value.asInstanceOf[Byte])
-        case ShortType => writer.addInteger(value.asInstanceOf[Short])
-        case IntegerType | DateType => writer.addInteger(value.asInstanceOf[Int])
-        case LongType => writer.addLong(value.asInstanceOf[Long])
-        case TimestampType => writeTimestamp(value.asInstanceOf[Long])
-        case FloatType => writer.addFloat(value.asInstanceOf[Float])
-        case DoubleType => writer.addDouble(value.asInstanceOf[Double])
-        case StringType => writer.addBinary(
-          Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes))
-        case BinaryType => writer.addBinary(
-          Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
-        case DecimalType.Fixed(precision, _) =>
-          writeDecimal(value.asInstanceOf[Decimal], precision)
-        case _ => sys.error(s"Do not know how to writer $schema to consumer")
-      }
-    }
-  }
-
-  private[parquet] def writeStruct(
-      schema: StructType,
-      struct: CatalystConverter.StructScalaType): Unit = {
-    if (struct != null) {
-      val fields = schema.fields.toArray
-      writer.startGroup()
-      var i = 0
-      while(i < fields.length) {
-        if (!struct.isNullAt(i)) {
-          writer.startField(fields(i).name, i)
-          writeValue(fields(i).dataType, struct.get(i, fields(i).dataType))
-          writer.endField(fields(i).name, i)
-        }
-        i = i + 1
-      }
-      writer.endGroup()
-    }
-  }
-
-  private[parquet] def writeArray(
-      schema: ArrayType,
-      array: CatalystConverter.ArrayScalaType): Unit = {
-    val elementType = schema.elementType
-    writer.startGroup()
-    if (array.numElements() > 0) {
-      if (schema.containsNull) {
-        writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
-        var i = 0
-        while (i < array.numElements()) {
-          writer.startGroup()
-          if (!array.isNullAt(i)) {
-            writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-            writeValue(elementType, array.get(i, elementType))
-            writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-          }
-          writer.endGroup()
-          i = i + 1
-        }
-        writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0)
-      } else {
-        writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-        var i = 0
-        while (i < array.numElements()) {
-          writeValue(elementType, array.get(i, elementType))
-          i = i + 1
-        }
-        writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0)
-      }
-    }
-    writer.endGroup()
-  }
-
-  private[parquet] def writeMap(
-      schema: MapType,
-      map: CatalystConverter.MapScalaType): Unit = {
-    writer.startGroup()
-    val length = map.numElements()
-    if (length > 0) {
-      writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-      map.foreach(schema.keyType, schema.valueType, (key, value) => {
-        writer.startGroup()
-        writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-        writeValue(schema.keyType, key)
-        writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0)
-        if (value != null) {
-          writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-          writeValue(schema.valueType, value)
-          writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1)
-        }
-        writer.endGroup()
-      })
-      writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0)
-    }
-    writer.endGroup()
-  }
-
-  // Scratch array used to write decimals as fixed-length byte array
-  private[this] var reusableDecimalBytes = new Array[Byte](16)
-
-  private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
-    val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision)
-
-    def longToBinary(unscaled: Long): Binary = {
-      var i = 0
-      var shift = 8 * (numBytes - 1)
-      while (i < numBytes) {
-        reusableDecimalBytes(i) = (unscaled >> shift).toByte
-        i += 1
-        shift -= 8
-      }
-      Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-    }
-
-    def bigIntegerToBinary(unscaled: BigInteger): Binary = {
-      unscaled.toByteArray match {
-        case bytes if bytes.length == numBytes =>
-          Binary.fromByteArray(bytes)
-
-        case bytes if bytes.length <= reusableDecimalBytes.length =>
-          val signedByte = (if (bytes.head < 0) -1 else 0).toByte
-          java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte)
-          System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length)
-          Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
-
-        case bytes =>
-          reusableDecimalBytes = new Array[Byte](bytes.length)
-          bigIntegerToBinary(unscaled)
-      }
-    }
-
-    val binary = if (numBytes <= 8) {
-      longToBinary(decimal.toUnscaledLong)
-    } else {
-      bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue())
-    }
-
-    writer.addBinary(binary)
-  }
-
-  // array used to write Timestamp as Int96 (fixed-length binary)
-  private[this] val int96buf = new Array[Byte](12)
-
-  private[parquet] def writeTimestamp(ts: Long): Unit = {
-    val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
-    val buf = ByteBuffer.wrap(int96buf)
-    buf.order(ByteOrder.LITTLE_ENDIAN)
-    buf.putLong(timeOfDayNanos)
-    buf.putInt(julianDay)
-    writer.addBinary(Binary.fromByteArray(int96buf))
-  }
-}
-
-// Optimized for non-nested rows
-private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
-  override def write(record: InternalRow): Unit = {
-    val attributesSize = attributes.size
-    if (attributesSize > record.numFields) {
-      throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " +
-        s"($attributesSize > ${record.numFields})")
-    }
-
-    var index = 0
-    writer.startMessage()
-    while(index < attributesSize) {
-      // null values indicate optional fields but we do not check currently
-      if (!record.isNullAt(index) && !record.isNullAt(index)) {
-        writer.startField(attributes(index).name, index)
-        consumeType(attributes(index).dataType, record, index)
-        writer.endField(attributes(index).name, index)
-      }
-      index = index + 1
-    }
-    writer.endMessage()
-  }
-
-  private def consumeType(
-      ctype: DataType,
-      record: InternalRow,
-      index: Int): Unit = {
-    ctype match {
-      case BooleanType => writer.addBoolean(record.getBoolean(index))
-      case ByteType => writer.addInteger(record.getByte(index))
-      case ShortType => writer.addInteger(record.getShort(index))
-      case IntegerType | DateType => writer.addInteger(record.getInt(index))
-      case LongType => writer.addLong(record.getLong(index))
-      case TimestampType => writeTimestamp(record.getLong(index))
-      case FloatType => writer.addFloat(record.getFloat(index))
-      case DoubleType => writer.addDouble(record.getDouble(index))
-      case StringType =>
-        writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes))
-      case BinaryType =>
-        writer.addBinary(Binary.fromByteArray(record.getBinary(index)))
-      case DecimalType.Fixed(precision, scale) =>
-        writeDecimal(record.getDecimal(index, precision, scale), precision)
-      case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
-    }
-  }
-}
-
-private[parquet] object RowWriteSupport {
-  val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes"
-
-  def getSchema(configuration: Configuration): Seq[Attribute] = {
-    val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
-    if (schemaString == null) {
-      throw new RuntimeException("Missing schema!")
-    }
-    ParquetTypesConverter.convertFromString(schemaString)
-  }
-
-  def setSchema(schema: Seq[Attribute], configuration: Configuration) {
-    val encoded = ParquetTypesConverter.convertToString(schema)
-    configuration.set(SPARK_ROW_SCHEMA, encoded)
-    configuration.set(
-      ParquetOutputFormat.WRITER_VERSION,
-      ParquetProperties.WriterVersion.PARQUET_1_0.toString)
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
deleted file mode 100644
index b647bb6116afaaed0ad1222870f971fef4a494c3..0000000000000000000000000000000000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet
-
-import java.io.IOException
-import java.util.{Collections, Arrays}
-
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.Job
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types._
-
-
-private[parquet] object ParquetTypesConverter extends Logging {
-  def isPrimitiveType(ctype: DataType): Boolean = ctype match {
-    case _: NumericType | BooleanType | DateType | TimestampType | StringType | BinaryType => true
-    case _ => false
-  }
-
-  /**
-   * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision.
-   */
-  private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision =>
-    var length = 1
-    while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) {
-      length += 1
-    }
-    length
-  }
-
-  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
-    val converter = new CatalystSchemaConverter()
-    converter.convert(StructType.fromAttributes(attributes))
-  }
-
-  def convertFromString(string: String): Seq[Attribute] = {
-    Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
-      case s: StructType => s.toAttributes
-      case other => sys.error(s"Can convert $string to row")
-    }
-  }
-
-  def convertToString(schema: Seq[Attribute]): String = {
-    schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
-    StructType.fromAttributes(schema).json
-  }
-
-  def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to write Parquet metadata: path is null")
-    }
-    val fs = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(
-        s"Unable to write Parquet metadata: path $origPath is incorrectly formatted")
-    }
-    val path = origPath.makeQualified(fs)
-    if (fs.exists(path) && !fs.getFileStatus(path).isDir) {
-      throw new IllegalArgumentException(s"Expected to write to directory $path but found file")
-    }
-    val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
-    if (fs.exists(metadataPath)) {
-      try {
-        fs.delete(metadataPath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath")
-      }
-    }
-    val extraMetadata = new java.util.HashMap[String, String]()
-    extraMetadata.put(
-      CatalystReadSupport.SPARK_METADATA_KEY,
-      ParquetTypesConverter.convertToString(attributes))
-    // TODO: add extra data, e.g., table name, date, etc.?
-
-    val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes)
-    val metaData: FileMetaData = new FileMetaData(
-      parquetSchema,
-      extraMetadata,
-      "Spark")
-
-    ParquetFileWriter.writeMetadataFile(
-      conf,
-      path,
-      Arrays.asList(new Footer(path, new ParquetMetadata(metaData, Collections.emptyList()))))
-  }
-
-  /**
-   * Try to read Parquet metadata at the given Path. We first see if there is a summary file
-   * in the parent directory. If so, this is used. Else we read the actual footer at the given
-   * location.
-   * @param origPath The path at which we expect one (or more) Parquet files.
-   * @param configuration The Hadoop configuration to use.
-   * @return The `ParquetMetadata` containing among other things the schema.
-   */
-  def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = {
-    if (origPath == null) {
-      throw new IllegalArgumentException("Unable to read Parquet metadata: path is null")
-    }
-    val job = new Job()
-    val conf = {
-      // scalastyle:off jobcontext
-      configuration.getOrElse(ContextUtil.getConfiguration(job))
-      // scalastyle:on jobcontext
-    }
-    val fs: FileSystem = origPath.getFileSystem(conf)
-    if (fs == null) {
-      throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
-    }
-    val path = origPath.makeQualified(fs)
-
-    val children =
-      fs
-        .globStatus(path)
-        .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) }
-        .filterNot { status =>
-          val name = status.getPath.getName
-          (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
-        }
-
-    // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
-    // groups. Since Parquet schema is replicated among all row groups, we only need to touch a
-    // single row group to read schema related metadata. Notice that we are making assumptions that
-    // all data in a single Parquet file have the same schema, which is normally true.
-    children
-      // Try any non-"_metadata" file first...
-      .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
-      // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
-      // empty, thus normally the "_metadata" file is expected to be fairly small).
-      .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
-      .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER))
-      .getOrElse(
-        throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 7992fd59ff4ba1bb90549d2c8d3969dddf3790b0..d17671d48a2fcd5ee37d4ee32bbdce0e1421a51c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -24,6 +24,7 @@ import com.clearspring.analytics.stream.cardinality.HyperLogLog
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.{OpenHashSetUDT, HyperLogLogUDT}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -68,7 +69,7 @@ private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
   private[spark] override def asNullable: MyDenseVectorUDT = this
 }
 
-class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
+class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetTest {
   import testImplicits._
 
   private lazy val pointsRDD = Seq(
@@ -98,17 +99,28 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext {
       Seq(Row(true), Row(true)))
   }
 
-
-  test("UDTs with Parquet") {
-    val tempDir = Utils.createTempDir()
-    tempDir.delete()
-    pointsRDD.write.parquet(tempDir.getCanonicalPath)
+  testStandardAndLegacyModes("UDTs with Parquet") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      pointsRDD.write.parquet(path)
+      checkAnswer(
+        sqlContext.read.parquet(path),
+        Seq(
+          Row(1.0, new MyDenseVector(Array(0.1, 1.0))),
+          Row(0.0, new MyDenseVector(Array(0.2, 2.0)))))
+    }
   }
 
-  test("Repartition UDTs with Parquet") {
-    val tempDir = Utils.createTempDir()
-    tempDir.delete()
-    pointsRDD.repartition(1).write.parquet(tempDir.getCanonicalPath)
+  testStandardAndLegacyModes("Repartition UDTs with Parquet") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      pointsRDD.repartition(1).write.parquet(path)
+      checkAnswer(
+        sqlContext.read.parquet(path),
+        Seq(
+          Row(1.0, new MyDenseVector(Array(0.1, 1.0))),
+          Row(0.0, new MyDenseVector(Array(0.2, 2.0)))))
+    }
   }
 
   // Tests to make sure that all operators correctly convert types on the way out.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index cd552e83372f140eb5bee312d9bc061adc2f3c69..599cf948e76a090327fcab38875b9726098d1285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -28,10 +28,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.parquet.example.data.simple.SimpleGroup
 import org.apache.parquet.example.data.{Group, GroupWriter}
+import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{BlockMetaData, CompressionCodecName, FileMetaData, ParquetMetadata}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
@@ -99,16 +99,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data))
   }
 
-  test("fixed-length decimals") {
-    def makeDecimalRDD(decimal: DecimalType): DataFrame =
-      sparkContext
-        .parallelize(0 to 1000)
-        .map(i => Tuple1(i / 100.0))
-        .toDF()
-        // Parquet doesn't allow column names with spaces, have to add an alias here
-        .select($"_1" cast decimal as "dec")
+  testStandardAndLegacyModes("fixed-length decimals") {
+    def makeDecimalRDD(decimal: DecimalType): DataFrame = {
+      sqlContext
+        .range(1000)
+        // Parquet doesn't allow column names with spaces, have to add an alias here.
+        // Minus 500 here so that negative decimals are also tested.
+        .select((('id - 500) / 100.0) cast decimal as 'dec)
+        .coalesce(1)
+    }
 
-    for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))) {
+    val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))
+    for ((precision, scale) <- combinations) {
       withTempPath { dir =>
         val data = makeDecimalRDD(DecimalType(precision, scale))
         data.write.parquet(dir.getCanonicalPath)
@@ -132,22 +134,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("map") {
+  testStandardAndLegacyModes("map") {
     val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
     checkParquetFile(data)
   }
 
-  test("array") {
+  testStandardAndLegacyModes("array") {
     val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1)))
     checkParquetFile(data)
   }
 
-  test("array and double") {
+  testStandardAndLegacyModes("array and double") {
     val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble)))
     checkParquetFile(data)
   }
 
-  test("struct") {
+  testStandardAndLegacyModes("struct") {
     val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
     withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
@@ -157,7 +159,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("nested struct with array of array as field") {
+  testStandardAndLegacyModes("nested struct with array of array as field") {
     val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
     withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
@@ -167,7 +169,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     }
   }
 
-  test("nested map with struct as value type") {
+  testStandardAndLegacyModes("nested map with struct as value type") {
     val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i"))))
     withParquetDataFrame(data) { df =>
       checkAnswer(df, data.map { case Tuple1(m) =>
@@ -205,14 +207,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
   }
 
   test("compression codec") {
-    def compressionCodecFor(path: String): String = {
-      val codecs = ParquetTypesConverter
-        .readMetaData(new Path(path), Some(hadoopConfiguration)).getBlocks.asScala
-        .flatMap(_.getColumns.asScala)
-        .map(_.getCodec.name())
-        .distinct
-
-      assert(codecs.size === 1)
+    def compressionCodecFor(path: String, codecName: String): String = {
+      val codecs = for {
+        footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConfiguration)
+        block <- footer.getParquetMetadata.getBlocks.asScala
+        column <- block.getColumns.asScala
+      } yield column.getCodec.name()
+
+      assert(codecs.distinct === Seq(codecName))
       codecs.head
     }
 
@@ -222,7 +224,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
         withParquetFile(data) { path =>
           assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
-            compressionCodecFor(path)
+            compressionCodecFor(path, codec.name())
           }
         }
       }
@@ -278,15 +280,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     withTempPath { file =>
       val path = new Path(file.toURI.toString)
       val fs = FileSystem.getLocal(hadoopConfiguration)
-      val attributes = ScalaReflection.attributesFor[(Int, String)]
-      ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration)
+      val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)])
+      writeMetadata(schema, path, hadoopConfiguration)
 
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)))
       assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
 
-      val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration))
-      val actualSchema = metaData.getFileMetaData.getSchema
-      val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes)
+      val expectedSchema = new CatalystSchemaConverter().convert(schema)
+      val actualSchema = readFooter(path, hadoopConfiguration).getFileMetaData.getSchema
 
       actualSchema.checkContains(expectedSchema)
       expectedSchema.checkContains(actualSchema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 1c1cfa34ad04b63cbaa457502e12afcf26414902..cc02ef81c9f8bd2ae060ee77aa58a7ffc5e5b728 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -484,7 +484,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
-  test("SPARK-10301 requested schema clipping - UDT") {
+  testStandardAndLegacyModes("SPARK-10301 requested schema clipping - UDT") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
 
@@ -517,6 +517,50 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
         Row(Row(NestedStruct(1, 2L, 3.5D))))
     }
   }
+
+  test("expand UDT in StructType") {
+    val schema = new StructType().add("n", new NestedStructUDT, nullable = true)
+    val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true)
+    assert(CatalystReadSupport.expandUDT(schema) === expected)
+  }
+
+  test("expand UDT in ArrayType") {
+    val schema = new StructType().add(
+      "n",
+      ArrayType(
+        elementType = new NestedStructUDT,
+        containsNull = false),
+      nullable = true)
+
+    val expected = new StructType().add(
+      "n",
+      ArrayType(
+        elementType = new NestedStructUDT().sqlType,
+        containsNull = false),
+      nullable = true)
+
+    assert(CatalystReadSupport.expandUDT(schema) === expected)
+  }
+
+  test("expand UDT in MapType") {
+    val schema = new StructType().add(
+      "n",
+      MapType(
+        keyType = IntegerType,
+        valueType = new NestedStructUDT,
+        valueContainsNull = false),
+      nullable = true)
+
+    val expected = new StructType().add(
+      "n",
+      MapType(
+        keyType = IntegerType,
+        valueType = new NestedStructUDT().sqlType,
+        valueContainsNull = false),
+      nullable = true)
+
+    assert(CatalystReadSupport.expandUDT(schema) === expected)
+  }
 }
 
 object TestingUDT {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index f17fb36f25fe8eed620e4df683bcdadf51a55f25..60fa81b1ab819fb07e876556f4eb84a6a9b163c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -357,8 +357,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
     val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}"""
     // scalastyle:on
 
-    val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString)
-    val fromJson = ParquetTypesConverter.convertFromString(jsonString)
+    val fromCaseClassString = StructType.fromString(caseClassString)
+    val fromJson = StructType.fromString(jsonString)
 
     (fromCaseClassString, fromJson).zipped.foreach { (a, b) =>
       assert(a.name == b.name)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 442fafb12f20017332ab9f9082862844cd7e1147..9840ad919e510b590480b68985040a85a3b34e69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -19,11 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
 
+import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata}
+import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
+
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLConf, SaveMode}
 
 /**
  * A helper trait that provides convenient facilities for Parquet testing.
@@ -97,4 +105,38 @@ private[sql] trait ParquetTest extends SQLTestUtils {
     assert(partDir.mkdirs(), s"Couldn't create directory $partDir")
     partDir
   }
+
+  protected def writeMetadata(
+      schema: StructType, path: Path, configuration: Configuration): Unit = {
+    val parquetSchema = new CatalystSchemaConverter().convert(schema)
+    val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava
+    val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}"
+    val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy)
+    val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava)
+    val footer = new Footer(path, parquetMetadata)
+    ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava)
+  }
+
+  protected def readAllFootersWithoutSummaryFiles(
+      path: Path, configuration: Configuration): Seq[Footer] = {
+    val fs = path.getFileSystem(configuration)
+    ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq
+  }
+
+  protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = {
+    ParquetFileReader.readFooter(
+      configuration,
+      new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE),
+      ParquetMetadataConverter.NO_FILTER)
+  }
+
+  protected def testStandardAndLegacyModes(testName: String)(f: => Unit): Unit = {
+    test(s"Standard mode - $testName") {
+      withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { f }
+    }
+
+    test(s"Legacy mode - $testName") {
+      withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { f }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 107457f79ec03fe8e1f82436ce17719075378757..d63f3d399652356376b63a2494925d5307993bb7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{QueryTest, Row, SaveMode}
 import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
+import org.apache.spark.sql.{SQLConf, QueryTest, Row, SaveMode}
 
 class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton {
   import hiveContext.implicits._
@@ -74,11 +74,13 @@ class DataSourceWithHiveMetastoreCatalogSuite
   ).foreach { case (provider, (inputFormat, outputFormat, serde)) =>
     test(s"Persist non-partitioned $provider relation into metastore as managed table") {
       withTable("t") {
-        testDF
-          .write
-          .mode(SaveMode.Overwrite)
-          .format(provider)
-          .saveAsTable("t")
+        withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") {
+          testDF
+            .write
+            .mode(SaveMode.Overwrite)
+            .format(provider)
+            .saveAsTable("t")
+        }
 
         val hiveTable = catalog.client.getTable("default", "t")
         assert(hiveTable.inputFormat === Some(inputFormat))
@@ -102,12 +104,14 @@ class DataSourceWithHiveMetastoreCatalogSuite
         withTable("t") {
           val path = dir.getCanonicalFile
 
-          testDF
-            .write
-            .mode(SaveMode.Overwrite)
-            .format(provider)
-            .option("path", path.toString)
-            .saveAsTable("t")
+          withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") {
+            testDF
+              .write
+              .mode(SaveMode.Overwrite)
+              .format(provider)
+              .option("path", path.toString)
+              .saveAsTable("t")
+          }
 
           val hiveTable = catalog.client.getTable("default", "t")
           assert(hiveTable.inputFormat === Some(inputFormat))