From ec18cd0af497d170bdcec345d845d925fb2880cf Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Wed, 6 Jul 2016 12:09:53 +0800
Subject: [PATCH] [SPARK-16389][SQL] Remove MetastoreRelation from
 SparkHiveWriterContainer and SparkHiveDynamicPartitionWriterContainer

#### What changes were proposed in this pull request?
- Remove useless `MetastoreRelation` from the signature of `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`.
- Avoid unnecessary metadata retrieval using Hive client in `InsertIntoHiveTable`.

#### How was this patch tested?
Existing test cases already cover it.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14062 from gatorsmile/removeMetastoreRelation.
---
 .../sql/hive/execution/InsertIntoHiveTable.scala | 16 ++++++----------
 .../spark/sql/hive/hiveWriterContainers.scala    |  8 +++-----
 2 files changed, 9 insertions(+), 15 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3d58d490a5..eb0c31ced6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -223,22 +223,18 @@ case class InsertIntoHiveTable(
         jobConf,
         fileSinkConf,
         dynamicPartColNames,
-        child.output,
-        table)
+        child.output)
     } else {
       new SparkHiveWriterContainer(
         jobConf,
         fileSinkConf,
-        child.output,
-        table)
+        child.output)
     }
 
     @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
     saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
 
     val outputPath = FileOutputFormat.getOutputPath(jobConf)
-    // Have to construct the format of dbname.tablename.
-    val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
     // TODO: Correctly set holdDDLTime.
     // In most of the time, we should have holdDDLTime = false.
     // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
@@ -260,7 +256,7 @@ case class InsertIntoHiveTable(
         client.synchronized {
           client.loadDynamicPartitions(
             outputPath.toString,
-            qualifiedTableName,
+            table.catalogTable.qualifiedName,
             orderedPartitionSpec,
             overwrite,
             numDynamicPartitions,
@@ -274,13 +270,13 @@ case class InsertIntoHiveTable(
         // scalastyle:on
         val oldPart =
           client.getPartitionOption(
-            client.getTable(table.databaseName, table.tableName),
+            table.catalogTable,
             partitionSpec)
 
         if (oldPart.isEmpty || !ifNotExists) {
             client.loadPartition(
               outputPath.toString,
-              qualifiedTableName,
+              table.catalogTable.qualifiedName,
               orderedPartitionSpec,
               overwrite,
               holdDDLTime,
@@ -291,7 +287,7 @@ case class InsertIntoHiveTable(
     } else {
       client.loadTable(
         outputPath.toString, // TODO: URI
-        qualifiedTableName,
+        table.catalogTable.qualifiedName,
         overwrite,
         holdDDLTime)
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index e65c24e6f1..ea88276bb9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -53,8 +53,7 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 private[hive] class SparkHiveWriterContainer(
     @transient private val jobConf: JobConf,
     fileSinkConf: FileSinkDesc,
-    inputSchema: Seq[Attribute],
-    table: MetastoreRelation)
+    inputSchema: Seq[Attribute])
   extends Logging
   with HiveInspectors
   with Serializable {
@@ -217,9 +216,8 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
     jobConf: JobConf,
     fileSinkConf: FileSinkDesc,
     dynamicPartColNames: Array[String],
-    inputSchema: Seq[Attribute],
-    table: MetastoreRelation)
-  extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) {
+    inputSchema: Seq[Attribute])
+  extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) {
 
   import SparkHiveDynamicPartitionWriterContainer._
 
-- 
GitLab