diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 7b030b7d73bd569762dc70c06589f2c4188ea820..84eef0f8a672ce990fe4179d549ea4987f0b6fa1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
 import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql._
 import org.apache.spark.util.SerializableConfiguration
 
@@ -544,11 +544,32 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   }
 
   private def discoverPartitions(): PartitionSpec = {
-    val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
     // We use leaf dirs containing data files to discover the schema.
     val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
-    PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
-      typeInference)
+    userDefinedPartitionColumns match {
+      case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
+        val spec = PartitioningUtils.parsePartitions(
+          leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
+
+        // Without auto inference, all of value in the `row` should be null or in StringType,
+        // we need to cast into the data type that user specified.
+        def castPartitionValuesToUserSchema(row: InternalRow) = {
+          InternalRow((0 until row.numFields).map { i =>
+            Cast(
+              Literal.create(row.getString(i), StringType),
+              userProvidedSchema.fields(i).dataType).eval()
+          }: _*)
+        }
+
+        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+          part.copy(values = castPartitionValuesToUserSchema(part.values))
+        })
+
+      case _ =>
+        // user did not provide a partitioning schema
+        PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
+          typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
+    }
   }
 
   /**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 42b9b3d6340d8d3e672a83f6fe0e2bca1ab53a1a..e3605bb3f6bf0461b136ba916ac6b789d5a815a1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -510,21 +510,39 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
     }
   }
 
-  // HadoopFsRelation.discoverPartitions() called by refresh(), which will ignore
-  // the given partition data type.
-  ignore("Partition column type casting") {
+  test("SPARK-9735 Partition column type casting") {
     withTempPath { file =>
-      val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2)
-
-      input
-        .write
-        .format(dataSourceName)
-        .mode(SaveMode.Overwrite)
-        .partitionBy("ps", "p2")
-        .saveAsTable("t")
+      val df = (for {
+        i <- 1 to 3
+        p2 <- Seq("foo", "bar")
+      } yield (i, s"val_$i", 1.0d, p2, 123, 123.123f)).toDF("a", "b", "p1", "p2", "p3", "f")
+
+      val input = df.select(
+        'a,
+        'b,
+        'p1.cast(StringType).as('ps1),
+        'p2,
+        'p3.cast(FloatType).as('pf1),
+        'f)
 
       withTempTable("t") {
-        checkAnswer(sqlContext.table("t"), input.collect())
+        input
+          .write
+          .format(dataSourceName)
+          .mode(SaveMode.Overwrite)
+          .partitionBy("ps1", "p2", "pf1", "f")
+          .saveAsTable("t")
+
+        input
+          .write
+          .format(dataSourceName)
+          .mode(SaveMode.Append)
+          .partitionBy("ps1", "p2", "pf1", "f")
+          .saveAsTable("t")
+
+        val realData = input.collect()
+
+        checkAnswer(sqlContext.table("t"), realData ++ realData)
       }
     }
   }