From 8942353905c354c4ce31b0d1a44d33feb3dcf737 Mon Sep 17 00:00:00 2001
From: windpiger <songjun@outlook.com>
Date: Sat, 14 Jan 2017 10:53:33 -0800
Subject: [PATCH] [SPARK-19151][SQL] DataFrameWriter.saveAsTable support hive
 overwrite

## What changes were proposed in this pull request?

After [SPARK-19107](https://issues.apache.org/jira/browse/SPARK-19107), we now can treat hive as a data source and create hive tables with DataFrameWriter and Catalog. However, the support is not completed, there are still some cases we do not support.

This PR implement:
DataFrameWriter.saveAsTable work with hive format with overwrite mode

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #16549 from windpiger/saveAsTableWithHiveOverwrite.
---
 .../apache/spark/sql/DataFrameWriter.scala    | 15 ++++++++----
 .../spark/sql/hive/HiveStrategies.scala       |  9 ++++---
 .../sql/hive/execution/HiveDDLSuite.scala     | 24 +++++++++++++++----
 3 files changed, 34 insertions(+), 14 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 82331fdb9b..7fc03bd5ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
@@ -380,17 +380,22 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
       case (true, SaveMode.Overwrite) =>
-        // Get all input data source relations of the query.
+        // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
           case LogicalRelation(src: BaseRelation, _, _) => src
+          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
+            relation.catalogTable.identifier
         }
         EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
-          // Only do the check if the table is a data source table (the relation is a BaseRelation).
-          // TODO(cloud-fan): also check hive table relation here when we support overwrite mode
-          // for creating hive tables.
+          // check if the table is a data source table (the relation is a BaseRelation).
           case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
+          // check hive table relation when overwrite mode
+          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
+            && srcRelations.contains(relation.catalogTable.identifier) =>
+            throw new AnalysisException(
+              s"Cannot overwrite table $tableName that is also being read from")
           case _ => // OK
         }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6d5cc5778a..d1f11e78b4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -109,12 +109,11 @@ private[hive] trait HiveStrategies {
           table, partition, planLater(child), overwrite, ifNotExists) :: Nil
 
       case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-        // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
-        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
-        // tables yet.
-        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+        // Currently `DataFrameWriter.saveAsTable` doesn't support
+        // the Append mode of hive serde tables yet.
+        if (mode == SaveMode.Append) {
           throw new AnalysisException(
-            "CTAS for hive serde tables does not support append or overwrite semantics.")
+            "CTAS for hive serde tables does not support append semantics.")
         }
 
         val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 0af331e67b..e3f1667249 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1314,7 +1314,24 @@ class HiveDDLSuite
         .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
       checkAnswer(spark.table("t"), Row(1, "a"))
 
-      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      Seq("c" -> 1).toDF("i", "j").write.format("hive")
+        .mode(SaveMode.Overwrite).option("fileFormat", "parquet").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row("c", 1))
+
+      var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      assert(DDLUtils.isHiveTable(table))
+      assert(table.storage.inputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+      assert(table.storage.outputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+      assert(table.storage.serde ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+
+      Seq(9 -> "x").toDF("i", "j")
+        .write.format("hive").mode(SaveMode.Overwrite).option("fileFormat", "avro").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(9, "x"))
+
+      table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
       assert(DDLUtils.isHiveTable(table))
       assert(table.storage.inputFormat ==
         Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
@@ -1324,7 +1341,7 @@ class HiveDDLSuite
         Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
 
       sql("INSERT INTO t SELECT 2, 'b'")
-      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+      checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
 
       val e = intercept[AnalysisException] {
         Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
@@ -1340,8 +1357,7 @@ class HiveDDLSuite
       val e3 = intercept[AnalysisException] {
         spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
       }
-      assert(e3.message.contains(
-        "CTAS for hive serde tables does not support append or overwrite semantics"))
+      assert(e3.message.contains("Cannot overwrite table default.t that is also being read from"))
     }
   }
 
-- 
GitLab