From 85ee0cabe87a27b6947c2d3e8525f04c77f80f6f Mon Sep 17 00:00:00 2001
From: Daoyuan Wang <daoyuan.wang@intel.com>
Date: Mon, 13 Apr 2015 14:29:07 -0700
Subject: [PATCH] [SPARK-6130] [SQL] support if not exists for insert overwrite
 into partition in hiveQl
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Standard syntax:
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement;
INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
 
Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;
FROM from_statement
INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2]
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;
 
Hive extension (dynamic partition inserts):
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #4865 from adrian-wang/insertoverwrite and squashes the following commits:

2fce94f [Daoyuan Wang] add assert
10ea6f3 [Daoyuan Wang] add name for boolean parameter
0bbe9b9 [Daoyuan Wang] fix failure
4391154 [Daoyuan Wang] support if not exists for insert overwrite into partition in hiveQl
---
 .../apache/spark/sql/catalyst/SqlParser.scala |  2 +-
 .../sql/catalyst/analysis/Analyzer.scala      |  2 +-
 .../spark/sql/catalyst/dsl/package.scala      |  2 +-
 .../plans/logical/basicOperators.scala        |  4 ++-
 .../org/apache/spark/sql/DataFrame.scala      |  2 +-
 .../spark/sql/execution/SparkStrategies.scala |  3 +-
 .../sql/sources/DataSourceStrategy.scala      |  2 +-
 .../org/apache/spark/sql/sources/rules.scala  |  7 +++--
 .../execution/HiveCompatibilitySuite.scala    |  1 +
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 19 ++++++------
 .../org/apache/spark/sql/hive/HiveQl.scala    | 22 ++++++++++++-
 .../spark/sql/hive/HiveStrategies.scala       | 10 +++---
 .../hive/execution/CreateTableAsSelect.scala  |  2 +-
 .../hive/execution/InsertIntoHiveTable.scala  | 31 +++++++++++++------
 ...titions-0-d5edc0daa94b33915df794df3b710774 |  0
 ...titions-1-9eb9372f4855928fae16f5fa554b3a62 |  0
 ...itions-10-ec2cef3d37146c450c60202a572f5cab |  0
 ...itions-11-8854d6001200fc11529b2e2da755e5a2 |  0
 ...itions-12-71ff68fda0aa7a36cb50d8fab0d70d25 |  0
 ...titions-13-7e4e7d7003fc6ef17bc19c3461ad899 |  0
 ...itions-14-ec2cef3d37146c450c60202a572f5cab |  0
 ...itions-15-a3b2e230efde74e970ae8a3b55f383fc |  0
 ...titions-2-8396c17a66e3d9a374d4361873b9bfe3 |  0
 ...titions-3-3876bb356dd8af7e78d061093d555457 |  0
 ...rtitions-4-528e23afb272c2e69004c86ddaa70ee |  0
 ...titions-5-de5d56456c28d63775554e56355911d2 |  0
 ...titions-6-3efdc331b3b4bdac3e60c757600fff53 |  5 +++
 ...titions-7-92f6af82704504968de078c133f222f8 |  0
 ...titions-8-316cad7c63ddd4fb043be2affa5b0a67 |  0
 ...titions-9-3efdc331b3b4bdac3e60c757600fff53 |  5 +++
 30 files changed, 84 insertions(+), 35 deletions(-)
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67
 create mode 100644 sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ee04cb579d..bc8d3751f6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -155,7 +155,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
 
   protected lazy val insert: Parser[LogicalPlan] =
     INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
-      case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
+      case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o, false)
     }
 
   protected lazy val cte: Parser[LogicalPlan] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 50702ac683..8b68b0df35 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -193,7 +193,7 @@ class Analyzer(
       }
 
       realPlan transform {
-        case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+        case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
           i.copy(
             table = EliminateSubQueries(getTable(u, cteRelations)))
         case u: UnresolvedRelation =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 145f062dd6..21c15ad14f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -293,7 +293,7 @@ package object dsl {
 
     def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
       InsertIntoTable(
-        analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
+        analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
 
     def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d31a6eecf..17522976dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -125,12 +125,14 @@ case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
-    overwrite: Boolean)
+    overwrite: Boolean,
+    ifNotExists: Boolean)
   extends LogicalPlan {
 
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = child.output
 
+  assert(overwrite || !ifNotExists)
   override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
     case (childAttr, tableAttr) =>
       DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 9b9adf8550..94ae2d65fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1209,7 +1209,7 @@ class DataFrame private[sql](
   @Experimental
   def insertInto(tableName: String, overwrite: Boolean): Unit = {
     sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
-      Map.empty, logicalPlan, overwrite)).toRdd
+      Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 23f7e56094..5268b73340 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -211,7 +211,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
         // Note: overwrite=false because otherwise the metadata we just created will be deleted
         InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
-      case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
+      case logical.InsertIntoTable(
+          table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
         InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
       case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
         val prunePushedDownFilters =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index e13759b7fe..34d048e426 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -56,7 +56,7 @@ private[sql] object DataSourceStrategy extends Strategy {
       execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty =>
+      l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
 
     case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 5a78001117..6ed68d179e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
 
       // We are inserting into an InsertableRelation.
       case i @ InsertIntoTable(
-      l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite) => {
+      l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite, ifNotExists) => {
         // First, make sure the data to be inserted have the same number of fields with the
         // schema of the relation.
         if (l.output.size != child.output.size) {
@@ -84,7 +84,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
+        l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
         // Right now, we do not support insert into a data source table with partition specs.
         if (partition.nonEmpty) {
           failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
@@ -102,7 +102,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
       case i @ logical.InsertIntoTable(
-        l: LogicalRelation, partition, query, overwrite) if !l.isInstanceOf[InsertableRelation] =>
+        l: LogicalRelation, partition, query, overwrite, ifNotExists)
+          if !l.isInstanceOf[InsertableRelation] =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")
 
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 2ae9d018e1..81ee48ef41 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -532,6 +532,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "inputddl7",
     "inputddl8",
     "insert1",
+    "insert1_overwrite_partitions",
     "insert2_overwrite_partitions",
     "insert_compressed",
     "join0",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 3ed5c5b031..f1c0bd92aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -527,7 +527,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       // Collects all `MetastoreRelation`s which should be replaced
       val toBeReplaced = plan.collect {
         // Write path
-        case InsertIntoTable(relation: MetastoreRelation, _, _, _)
+        case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
             // Inserting into partitioned table is not supported in Parquet data source (yet).
             if !relation.hiveQlTable.isPartitioned &&
               hive.convertMetastoreParquet &&
@@ -538,7 +538,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
           (relation, parquetRelation, attributedRewrites)
 
         // Write path
-        case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
+        case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
           // Inserting into partitioned table is not supported in Parquet data source (yet).
           if !relation.hiveQlTable.isPartitioned &&
             hive.convertMetastoreParquet &&
@@ -569,15 +569,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
           val alias = r.alias.getOrElse(r.tableName)
           Subquery(alias, parquetRelation)
 
-        case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
+        case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
           if relationMap.contains(r) =>
           val parquetRelation = relationMap(r)
-          InsertIntoTable(parquetRelation, partition, child, overwrite)
+          InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
 
-        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
+        case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
           if relationMap.contains(r) =>
           val parquetRelation = relationMap(r)
-          InsertIntoTable(parquetRelation, partition, child, overwrite)
+          InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
 
         case other => other.transformExpressions {
           case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
@@ -698,7 +698,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
+      case p @ InsertIntoTable(table: MetastoreRelation, _, child, _, _) =>
         castChildOutput(p, table, child)
     }
 
@@ -715,7 +715,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
           .forall { case (left, right) => left.sameType(right) }) {
         // If both types ignoring nullability of ArrayType, MapType, StructType are the same,
         // use InsertIntoHiveTable instead of InsertIntoTable.
-        InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite)
+        InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists)
       } else {
         // Only do the casting when child output data types differ from table output data types.
         val castedChildOutput = child.output.zip(table.output).map {
@@ -753,7 +753,8 @@ private[hive] case class InsertIntoHiveTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
-    overwrite: Boolean)
+    overwrite: Boolean,
+    ifNotExists: Boolean)
   extends LogicalPlan {
 
   override def children: Seq[LogicalPlan] = child :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index b2ae74efeb..53a204b8c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1002,7 +1002,27 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
           cleanIdentifier(key.toLowerCase) -> None
       }.toMap).getOrElse(Map.empty)
 
-      InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite)
+      InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, false)
+
+    case Token(destinationToken(),
+           Token("TOK_TAB",
+             tableArgs) ::
+           Token("TOK_IFNOTEXISTS",
+             ifNotExists) :: Nil) =>
+      val Some(tableNameParts) :: partitionClause :: Nil =
+        getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
+
+      val tableIdent = extractTableIdent(tableNameParts)
+
+      val partitionKeys = partitionClause.map(_.getChildren.map {
+        // Parse partitions. We also make keys case insensitive.
+        case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+          cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value))
+        case Token("TOK_PARTVAL", Token(key, Nil) :: Nil) =>
+          cleanIdentifier(key.toLowerCase) -> None
+      }.toMap).getOrElse(Map.empty)
+
+      InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, true)
 
     case a: ASTNode =>
       throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5f7e897295..1ccb0c279c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -184,12 +184,14 @@ private[hive] trait HiveStrategies {
 
   object DataSinks extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
+      case logical.InsertIntoTable(
+          table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
         execution.InsertIntoHiveTable(
-          table, partition, planLater(child), overwrite) :: Nil
-      case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) =>
+          table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+      case hive.InsertIntoHiveTable(
+          table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
         execution.InsertIntoHiveTable(
-          table, partition, planLater(child), overwrite) :: Nil
+          table, partition, planLater(child), overwrite, ifNotExists) :: Nil
       case _ => Nil
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index fade9e5852..76a1965f3c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -67,7 +67,7 @@ case class CreateTableAsSelect(
           new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
       }
     } else {
-      hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+      hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
     }
 
     Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 6c96747439..89995a91b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,8 @@ case class InsertIntoHiveTable(
     table: MetastoreRelation,
     partition: Map[String, Option[String]],
     child: SparkPlan,
-    overwrite: Boolean) extends UnaryNode with HiveInspectors {
+    overwrite: Boolean,
+    ifNotExists: Boolean) extends UnaryNode with HiveInspectors {
 
   @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
   @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@@ -219,15 +220,25 @@ case class InsertIntoHiveTable(
             isSkewedStoreAsSubdir)
         }
       } else {
-        catalog.synchronized {
-          catalog.client.loadPartition(
-            outputPath,
-            qualifiedTableName,
-            orderedPartitionSpec,
-            overwrite,
-            holdDDLTime,
-            inheritTableSpecs,
-            isSkewedStoreAsSubdir)
+        // scalastyle:off
+        // ifNotExists is only valid with static partition, refer to
+        // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
+        // scalastyle:on
+        val oldPart = catalog.synchronized {
+          catalog.client.getPartition(
+            catalog.client.getTable(qualifiedTableName), partitionSpec, false)
+        }
+        if (oldPart == null || !ifNotExists) {
+          catalog.synchronized {
+            catalog.client.loadPartition(
+              outputPath,
+              qualifiedTableName,
+              orderedPartitionSpec,
+              overwrite,
+              holdDDLTime,
+              inheritTableSpecs,
+              isSkewedStoreAsSubdir)
+          }
         }
       }
     } else {
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
new file mode 100644
index 0000000000..185a91c110
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
@@ -0,0 +1,5 @@
+98	val_98
+98	val_98
+97	val_97
+97	val_97
+96	val_96
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53
new file mode 100644
index 0000000000..185a91c110
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53
@@ -0,0 +1,5 @@
+98	val_98
+98	val_98
+97	val_97
+97	val_97
+96	val_96
-- 
GitLab