diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 60a9d1f020b42796d9625fce71b6eab03ecaa657..e6fc9749c72670d0be7fedc8a00678bb8d7ea0eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -21,8 +21,6 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.fs.Path
-
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
@@ -243,7 +241,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
 
   private def insertInto(tableIdent: TableIdentifier): Unit = {
     assertNotBucketed("insertInto")
-    val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
+
+    if (partitioningColumns.isDefined) {
+      throw new AnalysisException(
+        "insertInto() can't be used together with partitionBy(). " +
+          "Partition columns are defined by the table into which is being inserted."
+      )
+    }
+
+    val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap)
     val overwrite = mode == SaveMode.Overwrite
 
     // A partitioned relation's schema can be different from the input logicalPlan, since
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 3bf45ced75e0aa7280dab613b7eeb125fa7f7119..b890b4bffdcfe51b1c9748d17e2de3e1325cf404 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -346,6 +346,43 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
     }
   }
 
+  private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Hive SerDe table - $testName") {
+      val hiveTable = "hive_table"
+
+      withTable(hiveTable) {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE")
+          f(hiveTable)
+        }
+      }
+    }
+  }
+
+  private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Data source table - $testName") {
+      val dsTable = "ds_table"
+
+      withTable(dsTable) {
+        sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)")
+        f(dsTable)
+      }
+    }
+  }
+
+  private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
+    testPartitionedHiveSerDeTable(testName)(f)
+    testPartitionedDataSourceTable(testName)(f)
+  }
+
+  testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
+    val cause = intercept[AnalysisException] {
+      Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName)
+    }
+
+    assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
+  }
+
   test("InsertIntoTable#resolved should include dynamic partitions") {
     withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
       sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")