diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index c3c4e2925b90c67310ed10aad90bf818dfc4df58..2843100fb3b36a8b74fadcb393f2208058a6b96a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.SparkException
@@ -257,7 +258,28 @@ case class InsertIntoHiveTable(
             table.catalogTable.identifier.table,
             partitionSpec)
 
+        var doHiveOverwrite = overwrite
+
         if (oldPart.isEmpty || !ifNotExists) {
+          // SPARK-18107: Insert overwrite runs much slower than hive-client.
+          // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
+          // version and we may not want to catch up new Hive version every time. We delete the
+          // Hive partition first and then load data file into the Hive partition.
+          if (oldPart.nonEmpty && overwrite) {
+            oldPart.get.storage.locationUri.map { uri =>
+              val partitionPath = new Path(uri)
+              val fs = partitionPath.getFileSystem(hadoopConf)
+              if (fs.exists(partitionPath)) {
+                if (!fs.delete(partitionPath, true)) {
+                  throw new RuntimeException(
+                    "Cannot remove partition directory '" + partitionPath.toString)
+                }
+                // Don't let Hive do overwrite operation since it is slower.
+                doHiveOverwrite = false
+              }
+            }
+          }
+
           // inheritTableSpecs is set to true. It should be set to false for an IMPORT query
           // which is currently considered as a Hive native command.
           val inheritTableSpecs = true
@@ -266,7 +288,7 @@ case class InsertIntoHiveTable(
             table.catalogTable.identifier.table,
             outputPath.toString,
             partitionSpec,
-            isOverwrite = overwrite,
+            isOverwrite = doHiveOverwrite,
             holdDDLTime = holdDDLTime,
             inheritTableSpecs = inheritTableSpecs)
         }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f64010a64b016adff6ec85c9dbe7267d0e83817f..8b916932ff543beba561c0f9f9a5e8bbe71c8e8d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1973,6 +1973,39 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("Insert overwrite with partition") {
+    withTable("tableWithPartition") {
+      sql(
+        """
+          |CREATE TABLE tableWithPartition (key int, value STRING)
+          |PARTITIONED BY (part STRING)
+        """.stripMargin)
+      sql(
+        """
+          |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
+          |SELECT * FROM default.src
+        """.stripMargin)
+       checkAnswer(
+         sql("SELECT part, key, value FROM tableWithPartition"),
+         sql("SELECT '1' AS part, key, value FROM default.src")
+       )
+
+      sql(
+        """
+          |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part = '1')
+          |SELECT * FROM VALUES (1, "one"), (2, "two"), (3, null) AS data(key, value)
+        """.stripMargin)
+      checkAnswer(
+        sql("SELECT part, key, value FROM tableWithPartition"),
+        sql(
+          """
+            |SELECT '1' AS part, key, value FROM VALUES
+            |(1, "one"), (2, "two"), (3, null) AS data(key, value)
+          """.stripMargin)
+      )
+    }
+  }
+
   def testCommandAvailable(command: String): Boolean = {
     val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
     attempt.isSuccess && attempt.get == 0