Skip to content
Snippets Groups Projects
Commit ec18cd0a authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-16389][SQL] Remove MetastoreRelation from SparkHiveWriterContainer and...

[SPARK-16389][SQL] Remove MetastoreRelation from SparkHiveWriterContainer and SparkHiveDynamicPartitionWriterContainer

#### What changes were proposed in this pull request?
- Remove useless `MetastoreRelation` from the signature of `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`.
- Avoid unnecessary metadata retrieval using Hive client in `InsertIntoHiveTable`.

#### How was this patch tested?
Existing test cases already cover it.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14062 from gatorsmile/removeMetastoreRelation.
parent d0d28507
No related branches found
No related tags found
No related merge requests found
......@@ -223,22 +223,18 @@ case class InsertIntoHiveTable(
jobConf,
fileSinkConf,
dynamicPartColNames,
child.output,
table)
child.output)
} else {
new SparkHiveWriterContainer(
jobConf,
fileSinkConf,
child.output,
table)
child.output)
}
@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
// TODO: Correctly set holdDDLTime.
// In most of the time, we should have holdDDLTime = false.
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
......@@ -260,7 +256,7 @@ case class InsertIntoHiveTable(
client.synchronized {
client.loadDynamicPartitions(
outputPath.toString,
qualifiedTableName,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
......@@ -274,13 +270,13 @@ case class InsertIntoHiveTable(
// scalastyle:on
val oldPart =
client.getPartitionOption(
client.getTable(table.databaseName, table.tableName),
table.catalogTable,
partitionSpec)
if (oldPart.isEmpty || !ifNotExists) {
client.loadPartition(
outputPath.toString,
qualifiedTableName,
table.catalogTable.qualifiedName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
......@@ -291,7 +287,7 @@ case class InsertIntoHiveTable(
} else {
client.loadTable(
outputPath.toString, // TODO: URI
qualifiedTableName,
table.catalogTable.qualifiedName,
overwrite,
holdDDLTime)
}
......
......@@ -53,8 +53,7 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
private[hive] class SparkHiveWriterContainer(
@transient private val jobConf: JobConf,
fileSinkConf: FileSinkDesc,
inputSchema: Seq[Attribute],
table: MetastoreRelation)
inputSchema: Seq[Attribute])
extends Logging
with HiveInspectors
with Serializable {
......@@ -217,9 +216,8 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String],
inputSchema: Seq[Attribute],
table: MetastoreRelation)
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) {
inputSchema: Seq[Attribute])
extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) {
import SparkHiveDynamicPartitionWriterContainer._
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment