diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 66e52ca68af194ba492a32b2193ad5a74922d5f7..e901683be685480cdcced5da94d07ab6de2d95d5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
       def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
         InsertIntoTable(
           analysis.UnresolvedRelation(TableIdentifier(tableName)),
-          Map.empty, logicalPlan, overwrite, false)
+          Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
 
       def as(alias: String): LogicalPlan = logicalPlan match {
         case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38e9bb6c162ad942e72cf0d4bb9aea912ecc6171..ac1577b3abb4dfd05a689252fe6d3bf536673e5a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
       throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
         "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
     }
+    val overwrite = ctx.OVERWRITE != null
+    val overwritePartition =
+      if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) {
+        Some(partitionKeys.map(t => (t._1, t._2.get)))
+      } else {
+        None
+      }
 
     InsertIntoTable(
       UnresolvedRelation(tableIdent, None),
       partitionKeys,
       query,
-      ctx.OVERWRITE != null,
+      OverwriteOptions(overwrite, overwritePartition),
       ctx.EXISTS != null)
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a48974c6322ad0cd72266d7326070da94ecdc8b8..7a15c2285d584b369ba4bd0b140c211fc58ea985 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
@@ -345,18 +346,32 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
   override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
 }
 
+/**
+ * Options for writing new data into a table.
+ *
+ * @param enabled whether to overwrite existing data in the table.
+ * @param specificPartition only data in the specified partition will be overwritten.
+ */
+case class OverwriteOptions(
+    enabled: Boolean,
+    specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) {
+  if (specificPartition.isDefined) {
+    assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.")
+  }
+}
+
 case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
-    overwrite: Boolean,
+    overwrite: OverwriteOptions,
     ifNotExists: Boolean)
   extends LogicalPlan {
 
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = Seq.empty
 
-  assert(overwrite || !ifNotExists)
+  assert(overwrite.enabled || !ifNotExists)
   assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
 
   override lazy val resolved: Boolean = childrenResolved && table.resolved
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index ca86304d4d400b44f52e1ba92ec8387ee60c7fd0..7400f3430e99c7c6d6986f1016b3e95d65467380 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,7 +180,16 @@ class PlanParserSuite extends PlanTest {
         partition: Map[String, Option[String]],
         overwrite: Boolean = false,
         ifNotExists: Boolean = false): LogicalPlan =
-      InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
+      InsertIntoTable(
+        table("s"), partition, plan,
+        OverwriteOptions(
+          overwrite,
+          if (overwrite && partition.nonEmpty) {
+            Some(partition.map(kv => (kv._1, kv._2.get)))
+          } else {
+            None
+          }),
+        ifNotExists)
 
     // Single inserts
     assertEqual(s"insert overwrite table s $sql",
@@ -196,9 +205,9 @@ class PlanParserSuite extends PlanTest {
     val plan2 = table("t").where('x > 5).select(star())
     assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
       InsertIntoTable(
-        table("s"), Map.empty, plan.limit(1), overwrite = false, ifNotExists = false).union(
+        table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
         InsertIntoTable(
-          table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false)))
+          table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
   }
 
   test ("insert with if not exists") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 11dd1df9099383d680fefbe013e937e7bfbf5ef7..700f4835ac89aaa54d7ba5aaa5da70bdec9060c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
 import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
@@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         table = UnresolvedRelation(tableIdent),
         partition = Map.empty[String, Option[String]],
         child = df.logicalPlan,
-        overwrite = mode == SaveMode.Overwrite,
+        overwrite = OverwriteOptions(mode == SaveMode.Overwrite),
         ifNotExists = false)).toRdd
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 092aabc89a36cc84de88e38d437c0c823e3c0f60..443a2ec033a985bf913eacdcaa8fa5a53279572c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -67,7 +67,10 @@ class CatalogFileIndex(
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
       val partitions = selectedPartitions.map { p =>
-        PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+        val path = new Path(p.storage.locationUri.get)
+        val fs = path.getFileSystem(hadoopConf)
+        PartitionPath(
+          p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
       new PrunedInMemoryFileIndex(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 34b77cab65def7de11ed2047f34c563ed8b2be59..47c1f9d3fac1e07abc1cca663e95f71385c5affe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -174,14 +176,32 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
       }.flatten
 
-      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
-      if (overwrite && inputPaths.contains(outputPath)) {
+      val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append
+      if (overwrite.enabled && inputPaths.contains(outputPath)) {
         throw new AnalysisException(
           "Cannot overwrite a path that is also being read from.")
       }
 
+      val overwritingSinglePartition = (overwrite.specificPartition.isDefined &&
+        t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
+        l.catalogTable.get.partitionProviderIsHive)
+
+      val effectiveOutputPath = if (overwritingSinglePartition) {
+        val partition = t.sparkSession.sessionState.catalog.getPartition(
+          l.catalogTable.get.identifier, overwrite.specificPartition.get)
+        new Path(partition.storage.locationUri.get)
+      } else {
+        outputPath
+      }
+
+      val effectivePartitionSchema = if (overwritingSinglePartition) {
+        Nil
+      } else {
+        query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+      }
+
       def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
-        if (l.catalogTable.isDefined &&
+        if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
             l.catalogTable.get.partitionColumnNames.nonEmpty &&
             l.catalogTable.get.partitionProviderIsHive) {
           val metastoreUpdater = AlterTableAddPartitionCommand(
@@ -194,8 +214,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
       }
 
       val insertCmd = InsertIntoHadoopFsRelationCommand(
-        outputPath,
-        query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
+        effectiveOutputPath,
+        effectivePartitionSchema,
         t.bucketSpec,
         t.fileFormat,
         refreshPartitionsCallback,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index b2ff68a833fea5631286ad04cfda372866c20d06..2eba1e9986acdce33f277b97db2fa2057d781c68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.sources.InsertableRelation
 
@@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation
 case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
-    overwrite: Boolean)
+    overwrite: OverwriteOptions)
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand(
     val data = Dataset.ofRows(sparkSession, query)
     // Apply the schema of the existing table to the new data.
     val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
-    relation.insert(df, overwrite)
+    relation.insert(df, overwrite.enabled)
 
     // Invalidate the cache.
     sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9d2930948d6ba8d8648cd010ebc20767848bf5a5..ce1e3eb1a5bc9687086018198b0fd2e2b08912fb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -46,7 +46,8 @@ private[hive] trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(
           table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
-        InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+        InsertIntoHiveTable(
+          table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil
 
       case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
         val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a001fb6f7b9efc04e2ada3324b4225a5833..cac43597aef21a7840b53b33977e0f92e6e8eb9e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -88,7 +88,8 @@ case class CreateHiveTableAsSelectCommand(
     } else {
       try {
         sparkSession.sessionState.executePlan(InsertIntoTable(
-          metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+          metastoreRelation, Map(), query, overwrite = OverwriteOptions(true),
+          ifNotExists = false)).toRdd
       } catch {
         case NonFatal(e) =>
           // drop the created table.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 5f16960fb149690d077e9807c43a2d54fe73c761..ac435bf6195b033375c632e9dc2686ccce961719 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -134,4 +134,56 @@ class PartitionProviderCompatibilitySuite
       }
     }
   }
+
+  test("insert overwrite partition of legacy datasource table overwrites entire table") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable("test", dir)
+          spark.sql(
+            """insert overwrite table test
+              |partition (partCol=1)
+              |select * from range(100)""".stripMargin)
+          assert(spark.sql("select * from test").count() == 100)
+
+          // Dynamic partitions case
+          spark.sql("insert overwrite table test select id, id from range(10)".stripMargin)
+          assert(spark.sql("select * from test").count() == 10)
+        }
+      }
+    }
+  }
+
+  test("insert overwrite partition of new datasource table overwrites just partition") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable("test", dir)
+          sql("msck repair table test")
+          spark.sql(
+            """insert overwrite table test
+              |partition (partCol=1)
+              |select * from range(100)""".stripMargin)
+          assert(spark.sql("select * from test").count() == 104)
+
+          // Test overwriting a partition that has a custom location
+          withTempDir { dir2 =>
+            sql(
+              s"""alter table test partition (partCol=1)
+                |set location '${dir2.getAbsolutePath}'""".stripMargin)
+            assert(sql("select * from test").count() == 4)
+            sql(
+              """insert overwrite table test
+                |partition (partCol=1)
+                |select * from range(30)""".stripMargin)
+            sql(
+              """insert overwrite table test
+                |partition (partCol=1)
+                |select * from range(20)""".stripMargin)
+            assert(sql("select * from test").count() == 24)
+          }
+        }
+      }
+    }
+  }
 }