Skip to content
Snippets Groups Projects
Commit ea017b55 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable

## What changes were proposed in this pull request?
This patch removes the use of HiveConf from InsertIntoHiveTable. I think this is the last major use of HiveConf and after this we can try to remove the execution HiveConf.

## How was this patch tested?
Internal refactoring and should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #12728 from rxin/SPARK-14949.
parent 08dc8936
No related branches found
No related tags found
No related merge requests found
......@@ -21,8 +21,6 @@ import java.util
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
......@@ -35,7 +33,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf
private[hive]
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
......@@ -45,8 +43,6 @@ case class InsertIntoHiveTable(
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
@transient private val client = sessionState.metadataHive
@transient private val hiveconf = sessionState.hiveconf
@transient private lazy val hiveContext = new Context(hiveconf)
def output: Seq[Attribute] = Seq.empty
......@@ -70,7 +66,6 @@ case class InsertIntoHiveTable(
writerContainer.driverSideSetup()
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
writerContainer.commitJob()
}
/**
......@@ -85,19 +80,20 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
val isCompressed =
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
hiveconf.set("mapred.output.compress", "true")
hadoopConf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(hiveconf.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(hiveconf.get("mapred.output.compression.type"))
fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
}
val numDynamicPartitions = partition.values.count(_.isEmpty)
......@@ -114,13 +110,15 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
if (!hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 && hiveconf.getVar(
HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
if (numStaticPartitions == 0 &&
sessionState.conf.getConfString(
"hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
{
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
......@@ -131,7 +129,7 @@ case class InsertIntoHiveTable(
}
}
val jobConf = new JobConf(hiveconf)
val jobConf = new JobConf(hadoopConf)
val jobConfSer = new SerializableJobConf(jobConf)
// When speculation is on and output committer class name contains "Direct", we should warn
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment