diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 14dd707fa0f1c4e2e8d67b389b5d1c4c0d505916..259008f183b56bd66f43c0b509318340aedb3401 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -154,7 +154,8 @@ abstract class ExternalCatalog {
       table: String,
       parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit
+      purge: Boolean,
+      retainData: Boolean): Unit
 
   /**
    * Override the specs of one or many existing table partitions, assuming they exist.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a3ffeaa63f690ba833beb1fcb093bb20f22ea07c..880a7a0dc42253c4c3a882c35f0509f21fec0efd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -385,7 +385,8 @@ class InMemoryCatalog(
       table: String,
       partSpecs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = synchronized {
+      purge: Boolean,
+      retainData: Boolean): Unit = synchronized {
     requireTableExists(db, table)
     val existingParts = catalog(db).tables(table).partitions
     if (!ignoreIfNotExists) {
@@ -395,7 +396,12 @@ class InMemoryCatalog(
       }
     }
 
-    val shouldRemovePartitionLocation = getTable(db, table).tableType == CatalogTableType.MANAGED
+    val shouldRemovePartitionLocation = if (retainData) {
+      false
+    } else {
+      getTable(db, table).tableType == CatalogTableType.MANAGED
+    }
+
     // TODO: we should follow hive to roll back if one partition path failed to delete, and support
     // partial partition spec.
     partSpecs.foreach { p =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0b6a91fff71fe6d64ff7c38fd71ffd2a89f8b0f4..da3a2079f42d34a91e8547882fdcca3aabcbf107 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -687,13 +687,14 @@ class SessionCatalog(
       tableName: TableIdentifier,
       specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = {
+      purge: Boolean,
+      retainData: Boolean): Unit = {
     val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(tableName.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Option(db)))
     requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
-    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
+    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
   }
 
   /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 303a8662d3f4deab0c5b63f6acbc5b7d26b45e11..3b39f420af494cb82fa136bee98af276a7ef0fa7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -361,13 +361,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
     catalog.dropPartitions(
-      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+      "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
     resetState()
     val catalog2 = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
     catalog2.dropPartitions(
-      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
+      "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false,
+      retainData = false)
     assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
   }
 
@@ -375,11 +376,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
+        "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false,
+        retainData = false)
     }
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
+        "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false,
+        retainData = false)
     }
   }
 
@@ -387,10 +390,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val catalog = newBasicCatalog()
     intercept[AnalysisException] {
       catalog.dropPartitions(
-        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
+        "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false,
+        retainData = false)
     }
     catalog.dropPartitions(
-      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
+      "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false)
   }
 
   test("get partition") {
@@ -713,7 +717,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(exists(tableLocation, "partCol1=5", "partCol2=6"))
 
     catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
-      purge = false)
+      purge = false, retainData = false)
     assert(!exists(tableLocation, "partCol1=3", "partCol2=4"))
     assert(!exists(tableLocation, "partCol1=5", "partCol2=6"))
 
@@ -745,7 +749,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     val fs = partPath.getFileSystem(new Configuration)
     assert(fs.exists(partPath))
 
-    catalog.dropPartitions("db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
+    catalog.dropPartitions(
+      "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
     assert(fs.exists(partPath))
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 3f27160d639344eade84041288280b34cc09c63a..f9c4b2687bf7a1892deee46c8cc57bd2be96b7a4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -618,7 +618,8 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2", Some("db2")),
       Seq(part1.spec),
       ignoreIfNotExists = false,
-      purge = false)
+      purge = false,
+      retainData = false)
     assert(catalogPartitionsEqual(externalCatalog.listPartitions("db2", "tbl2"), part2))
     // Drop partitions without explicitly specifying database
     sessionCatalog.setCurrentDatabase("db2")
@@ -626,7 +627,8 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2"),
       Seq(part2.spec),
       ignoreIfNotExists = false,
-      purge = false)
+      purge = false,
+      retainData = false)
     assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
     // Drop multiple partitions at once
     sessionCatalog.createPartitions(
@@ -636,7 +638,8 @@ class SessionCatalogSuite extends SparkFunSuite {
       TableIdentifier("tbl2", Some("db2")),
       Seq(part1.spec, part2.spec),
       ignoreIfNotExists = false,
-      purge = false)
+      purge = false,
+      retainData = false)
     assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
   }
 
@@ -647,14 +650,16 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl1", Some("unknown_db")),
         Seq(),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     intercept[NoSuchTableException] {
       catalog.dropPartitions(
         TableIdentifier("does_not_exist", Some("db2")),
         Seq(),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
   }
 
@@ -665,13 +670,15 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl2", Some("db2")),
         Seq(part3.spec),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     catalog.dropPartitions(
       TableIdentifier("tbl2", Some("db2")),
       Seq(part3.spec),
       ignoreIfNotExists = true,
-      purge = false)
+      purge = false,
+      retainData = false)
   }
 
   test("drop partitions with invalid partition spec") {
@@ -681,7 +688,8 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl2", Some("db2")),
         Seq(partWithMoreColumns.spec),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, b, c) must be contained within " +
@@ -691,7 +699,8 @@ class SessionCatalogSuite extends SparkFunSuite {
         TableIdentifier("tbl2", Some("db2")),
         Seq(partWithUnknownColumns.spec),
         ignoreIfNotExists = false,
-        purge = false)
+        purge = false,
+        retainData = false)
     }
     assert(e.getMessage.contains(
       "Partition spec is invalid. The spec (a, unknown) must be contained within " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5f89a229d6242e56cdf68599bedd4c02f783e210..7a659ea151822985755546a067a52c942149c6bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -833,8 +833,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     AlterTableDropPartitionCommand(
       visitTableIdentifier(ctx.tableIdentifier),
       ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
-      ctx.EXISTS != null,
-      ctx.PURGE != null)
+      ifExists = ctx.EXISTS != null,
+      purge = ctx.PURGE != null,
+      retainData = false)
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0f126d0200eff9d7e9342f76b7d654352c6c53b3..c62c14200c24adf7092cab0bcf2337a52bb354bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -421,7 +421,8 @@ case class AlterTableDropPartitionCommand(
     tableName: TableIdentifier,
     specs: Seq[TablePartitionSpec],
     ifExists: Boolean,
-    purge: Boolean)
+    purge: Boolean,
+    retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -439,7 +440,8 @@ case class AlterTableDropPartitionCommand(
     }
 
     catalog.dropPartitions(
-      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
+      table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
+      retainData = retainData)
     Seq.empty[Row]
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index f3d92bf7cc245f09c80435ca5cb7ee2883c7b2d0..4468dc58e404a281d25f6bc2e04f4a186e2e6536 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -217,16 +217,25 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
             if (deletedPartitions.nonEmpty) {
               AlterTableDropPartitionCommand(
                 l.catalogTable.get.identifier, deletedPartitions.toSeq,
-                ifExists = true, purge = true).run(t.sparkSession)
+                ifExists = true, purge = false,
+                retainData = true /* already deleted */).run(t.sparkSession)
             }
           }
         }
         t.location.refresh()
       }
 
+      val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
+        overwrite.staticPartitionKeys.map { case (k, v) =>
+          (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
+        }
+      } else {
+        Map.empty
+      }
+
       val insertCmd = InsertIntoHadoopFsRelationCommand(
         outputPath,
-        if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
+        staticPartitionKeys,
         customPartitionLocations,
         partitionSchema,
         t.bucketSpec,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index d31e7aeb3a78ad0cbecbc0827a3ab3907aa91dd8..5ef5f8ee77418aa67a4ca29ed75e0b290f673e67 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -615,7 +615,8 @@ class DDLCommandSuite extends PlanTest {
         Map("dt" -> "2008-08-08", "country" -> "us"),
         Map("dt" -> "2009-09-09", "country" -> "uk")),
       ifExists = true,
-      purge = false)
+      purge = false,
+      retainData = false)
     val expected2_table = expected1_table.copy(ifExists = false)
     val expected1_purge = expected1_table.copy(purge = true)
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 065883234a780cdeae6ee16da4e18e948d59bcfa..c213e8e0b22e6c1e657852f262a9fd10dace89fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -850,9 +850,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       parts: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = withClient {
+      purge: Boolean,
+      retainData: Boolean): Unit = withClient {
     requireTableExists(db, table)
-    client.dropPartitions(db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge)
+    client.dropPartitions(
+      db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData)
   }
 
   override def renamePartitions(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 569a9c11398ea12907b5cad18955f411012b5f74..4c76932b61758a34b9828e2b171f1241c5d003ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -125,7 +125,8 @@ private[hive] trait HiveClient {
       table: String,
       specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit
+      purge: Boolean,
+      retainData: Boolean): Unit
 
   /**
    * Rename one or many existing table partitions, assuming they exist.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 590029a517e0937b67c9c43c7a50041bf1bcb6e3..bd840af5b1649ba13d1e4d079c7a21661a4bc7ce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -453,7 +453,8 @@ private[hive] class HiveClientImpl(
       table: String,
       specs: Seq[TablePartitionSpec],
       ignoreIfNotExists: Boolean,
-      purge: Boolean): Unit = withHiveState {
+      purge: Boolean,
+      retainData: Boolean): Unit = withHiveState {
     // TODO: figure out how to drop multiple partitions in one call
     val hiveTable = client.getTable(db, table, true /* throw exception */)
     // do the check at first and collect all the matching partitions
@@ -473,8 +474,7 @@ private[hive] class HiveClientImpl(
     var droppedParts = ArrayBuffer.empty[java.util.List[String]]
     matchingParts.foreach { partition =>
       try {
-        val deleteData = true
-        shim.dropPartition(client, db, table, partition, deleteData, purge)
+        shim.dropPartition(client, db, table, partition, !retainData, purge)
       } catch {
         case e: Exception =>
           val remainingParts = matchingParts.toBuffer -- droppedParts
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index e8e4238d1c5a40a1c5722e88263c7598064a245f..c2ac0327607805d0b328cd15556d8bc29122a3a8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -259,6 +259,41 @@ class PartitionProviderCompatibilitySuite
         }
       }
     }
+
+    test(s"SPARK-18659 insert overwrite table files - partition management $enabled") {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+        withTable("test") {
+          spark.range(10)
+            .selectExpr("id", "id as A", "'x' as B")
+            .write.partitionBy("A", "B").mode("overwrite")
+            .saveAsTable("test")
+          spark.sql("insert overwrite table test select id, id, 'x' from range(1)")
+          assert(spark.sql("select * from test").count() == 1)
+
+          spark.range(10)
+            .selectExpr("id", "id as A", "'x' as B")
+            .write.partitionBy("A", "B").mode("overwrite")
+            .saveAsTable("test")
+          spark.sql(
+            "insert overwrite table test partition (A, B) select id, id, 'x' from range(1)")
+          assert(spark.sql("select * from test").count() == 1)
+        }
+      }
+    }
+
+    test(s"SPARK-18659 insert overwrite table with lowercase - partition management $enabled") {
+      withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> enabled.toString) {
+        withTable("test") {
+          spark.range(10)
+            .selectExpr("id", "id as A", "'x' as B")
+            .write.partitionBy("A", "B").mode("overwrite")
+            .saveAsTable("test")
+          // note that 'A', 'B' are lowercase instead of their original case here
+          spark.sql("insert overwrite table test partition (a=1, b) select id, 'x' from range(1)")
+          assert(spark.sql("select * from test").count() == 10)
+        }
+      }
+    }
   }
 
   /**
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 081b0ed9bd688c6ffd2225a7b8a8da0318d343b8..16ae345de6d9534d5ccb21daae4db6ad2070c508 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -352,13 +352,13 @@ class VersionsSuite extends SparkFunSuite with Logging {
       // with a version that is older than the minimum (1.2 in this case).
       try {
         client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
-          purge = true)
+          purge = true, retainData = false)
         assert(!versionsWithoutPurge.contains(version))
       } catch {
         case _: UnsupportedOperationException =>
           assert(versionsWithoutPurge.contains(version))
           client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
-            purge = false)
+            purge = false, retainData = false)
       }
 
       assert(client.getPartitionOption("default", "src_part", spec).isEmpty)