diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 66dfcc308ceca8d6c7cdf58a1b992655903410d7..0a2007e15843c8994d8281d6700e10e05094905e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -26,6 +26,7 @@ import scala.util.Try
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.util.Shell
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types._
@@ -270,6 +271,18 @@ private[sql] object PartitioningUtils {
   private val upCastingOrder: Seq[DataType] =
     Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
 
+  def validatePartitionColumnDataTypes(
+      schema: StructType,
+      partitionColumns: Array[String]): Unit = {
+
+    ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field =>
+      field.dataType match {
+        case _: AtomicType => // OK
+        case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
+      }
+    }
+  }
+
   /**
    * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
    * types.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 7770bbd712f04d8d31e73dc0f8a98e5423e90b4c..8fbaf3a3059dbdc37447428410f1bd35d911205d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging {
     new ResolvedDataSource(clazz, relation)
   }
 
-  private def partitionColumnsSchema(
+  def partitionColumnsSchema(
       schema: StructType,
       partitionColumns: Array[String]): StructType = {
     StructType(partitionColumns.map { col =>
@@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging {
           val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
         }
+
+        PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns)
+
         val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
         val r = dataSource.createRelation(
           sqlContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 2f11f4042240251d8e2b059fb2502334d23e833e..d36197e50d4488bf0b0fdad885680cf8fd87636d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer(
           PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
       val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
       val partitionName = Literal(c.name + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
+      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
     }
 
     // Returns the partition path given a partition key.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 40ca8bf4095d88f9c7b9f2bf39f4dfa2aa207427..9d3d35692ffccac13b4a411f74ccd1ea9b280175 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
           // OK
         }
 
+        PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray)
+
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
           case LogicalRelation(src: BaseRelation) => src
@@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
           // OK
         }
 
-      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
+      case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table of
         // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (catalog.tableExists(Seq(tableName))) {
+        if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) {
           // Need to remove SubQuery operator.
           EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
             // Only do the check if the table is a data source table
@@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
           // OK
         }
 
+        PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns)
+
       case _ => // OK
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index af445626fbe4dfa329510afa8b582414ba839d52..8d0d9218ddd6a4e77f5737349526eb285a3949ab 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.sources
 
+import java.sql.Date
+
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
@@ -553,6 +555,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
       clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
     }
   }
+
+  test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
+    val df = Seq(
+      (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
+      (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")),
+      (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e")
+    withTempDir { file =>
+      intercept[AnalysisException] {
+        df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath)
+      }
+    }
+    intercept[AnalysisException] {
+      df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when