diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 27ac398063d4327228097cfdcb618ec451050bf3..04bf5d9b0f931efee30485672827cd40cdd0d8dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -799,14 +799,15 @@ class DataFrame protected[sql](
    * Returns the number of rows in the [[DataFrame]].
    * @group action
    */
-  override def count(): Long = groupBy().count().rdd.collect().head.getLong(0)
+  override def count(): Long = groupBy().count().collect().head.getLong(0)
 
   /**
    * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
    * @group rdd
    */
   override def repartition(numPartitions: Int): DataFrame = {
-    sqlContext.createDataFrame(rdd.repartition(numPartitions), schema)
+    sqlContext.createDataFrame(
+      queryExecution.toRdd.map(_.copy()).repartition(numPartitions), schema)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 73162b22fa9cd3943c6f29ae02149290f6e5728a..ffe388cfa95329001cf2440d6a18474e15d080aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -168,6 +168,7 @@ package object debug {
       case (_: Short, ShortType) =>
       case (_: Boolean, BooleanType) =>
       case (_: Double, DoubleType) =>
+      case (v, udt: UserDefinedType[_]) => typeCheck(v, udt.sqlType)
 
       case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 5f21d990e2e5beb852497508c0f4ec58d0a321e0..9c098df24c65f5e901943b5cf4c30b92d99230c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql
 
+import java.io.File
+
 import scala.beans.{BeanInfo, BeanProperty}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext.{sparkContext, sql}
 import org.apache.spark.sql.test.TestSQLContext.implicits._
@@ -91,4 +92,17 @@ class UserDefinedTypeSuite extends QueryTest {
       sql("SELECT testType(features) from points"),
       Seq(Row(true), Row(true)))
   }
+
+
+  test("UDTs with Parquet") {
+    val tempDir = File.createTempFile("parquet", "test")
+    tempDir.delete()
+    pointsRDD.saveAsParquetFile(tempDir.getCanonicalPath)
+  }
+
+  test("Repartition UDTs with Parquet") {
+    val tempDir = File.createTempFile("parquet", "test")
+    tempDir.delete()
+    pointsRDD.repartition(1).saveAsParquetFile(tempDir.getCanonicalPath)
+  }
 }