From e7033572330bd48b2438f218b0d2cd3fccdeb362 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian.cs.zju@gmail.com>
Date: Wed, 8 Oct 2014 18:11:18 -0700
Subject: [PATCH] [SPARK-3810][SQL] Makes PreInsertionCasts handle partitions
 properly

Includes partition keys into account when applying `PreInsertionCasts` rule.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2672 from liancheng/fix-pre-insert-casts and squashes the following commits:

def1a1a [Cheng Lian] Makes PreInsertionCasts handle partitions properly
---
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 15 +++-----
 .../sql/hive/execution/HiveQuerySuite.scala   | 36 +++++++++++++++++++
 2 files changed, 41 insertions(+), 10 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index cc0605b0ad..addd5bed84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -19,31 +19,28 @@ package org.apache.spark.sql.hive
 
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.Deserializer
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.Logging
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog}
+import org.apache.spark.sql.catalyst.analysis.Catalog
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.columnar.InMemoryRelation
-import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.util.Utils
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
 
 private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
-  import HiveMetastoreTypes._
+  import org.apache.spark.sql.hive.HiveMetastoreTypes._
 
   /** Connection to hive metastore.  Usages should lock on `this`. */
   protected[hive] val client = Hive.get(hive.hiveconf)
@@ -137,10 +134,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
     def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
       val childOutputDataTypes = child.output.map(_.dataType)
-      // Only check attributes, not partitionKeys since they are always strings.
-      // TODO: Fully support inserting into partitioned tables.
       val tableOutputDataTypes =
-        table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType)
+        (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
 
       if (childOutputDataTypes == tableOutputDataTypes) {
         p
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2e282a9ade..2829105f43 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -22,6 +22,7 @@ import scala.util.Try
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
@@ -675,6 +676,41 @@ class HiveQuerySuite extends HiveComparisonTest {
     sql("SELECT * FROM boom").queryExecution.analyzed
   }
 
+  test("SPARK-3810: PreInsertionCasts static partitioning support") {
+    val analyzedPlan = {
+      loadTestTable("srcpart")
+      sql("DROP TABLE IF EXISTS withparts")
+      sql("CREATE TABLE withparts LIKE srcpart")
+      sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src")
+        .queryExecution.analyzed
+    }
+
+    assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+      analyzedPlan.collect {
+        case _: Project => ()
+      }.size
+    }
+  }
+
+  test("SPARK-3810: PreInsertionCasts dynamic partitioning support") {
+    val analyzedPlan = {
+      loadTestTable("srcpart")
+      sql("DROP TABLE IF EXISTS withparts")
+      sql("CREATE TABLE withparts LIKE srcpart")
+      sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+      sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart")
+      sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src")
+        .queryExecution.analyzed
+    }
+
+    assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
+      analyzedPlan.collect {
+        case _: Project => ()
+      }.size
+    }
+  }
+
   test("parse HQL set commands") {
     // Adapted from its SQL counterpart.
     val testKey = "spark.sql.key.usedfortestonly"
-- 
GitLab