Skip to content
Snippets Groups Projects
Commit b64fcbd2 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Revert "[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive"

This reverts commit 0bbe7fae.
parent b167a8c7
No related branches found
No related tags found
No related merge requests found
Showing
with 299 additions and 226 deletions
......@@ -220,23 +220,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
*/
override def whiteList = Seq(
"add_part_exist",
"dynamic_partition_skip_default",
"infer_bucket_sort_dyn_part",
"load_dyn_part1",
"load_dyn_part2",
"load_dyn_part3",
"load_dyn_part4",
"load_dyn_part5",
"load_dyn_part6",
"load_dyn_part7",
"load_dyn_part8",
"load_dyn_part9",
"load_dyn_part10",
"load_dyn_part11",
"load_dyn_part12",
"load_dyn_part13",
"load_dyn_part14",
"load_dyn_part14_win",
"add_part_multiple",
"add_partition_no_whitelist",
"add_partition_with_whitelist",
......
......@@ -21,24 +21,20 @@ import java.io.IOException
import java.text.NumberFormat
import java.util.Date
import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
import org.apache.hadoop.io.Writable
import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
* It is based on [[SparkHadoopWriter]].
*/
private[hive] class SparkHiveWriterContainer(
private[hive] class SparkHiveHadoopWriter(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc)
extends Logging
......@@ -46,7 +42,7 @@ private[hive] class SparkHiveWriterContainer(
with Serializable {
private val now = new Date()
protected val conf = new SerializableWritable(jobConf)
private val conf = new SerializableWritable(jobConf)
private var jobID = 0
private var splitID = 0
......@@ -55,75 +51,116 @@ private[hive] class SparkHiveWriterContainer(
private var taID: SerializableWritable[TaskAttemptID] = null
@transient private var writer: FileSinkOperator.RecordWriter = null
@transient private lazy val committer = conf.value.getOutputCommitter
@transient private lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
@transient private var format: HiveOutputFormat[AnyRef, Writable] = null
@transient private var committer: OutputCommitter = null
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
def driverSideSetup() {
def preSetup() {
setIDs(0, 0, 0)
setConfParams()
committer.setupJob(jobContext)
}
def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
setIDs(jobId, splitId, attemptId)
setConfParams()
committer.setupTask(taskContext)
initWriters()
val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
}
protected def getOutputName: String = {
val numberFormat = NumberFormat.getInstance()
numberFormat.setMinimumIntegerDigits(5)
numberFormat.setGroupingUsed(false)
val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
"part-" + numberFormat.format(splitID) + extension
def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid)
setConfParams()
}
def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
def open() {
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)
def close() {
// Seems the boolean value passed into close does not matter.
writer.close(false)
commit()
}
val extension = Utilities.getFileExtension(
conf.value,
fileSinkConf.getCompressed,
getOutputFormat())
def commitJob() {
committer.commitJob(jobContext)
}
val outputName = "part-" + numfmt.format(splitID) + extension
val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
protected def initWriters() {
// NOTE this method is executed at the executor side.
// For Hive tables without partitions or with only static partitions, only 1 writer is needed.
getOutputCommitter().setupTask(getTaskContext())
writer = HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
fileSinkConf,
FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
Reporter.NULL)
path,
null)
}
protected def commit() {
if (committer.needsTaskCommit(taskContext)) {
def write(value: Writable) {
if (writer != null) {
writer.write(value)
} else {
throw new IOException("Writer is null, open() has not been called")
}
}
def close() {
// Seems the boolean value passed into close does not matter.
writer.close(false)
}
def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
committer.commitTask(taskContext)
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
case e: IOException =>
logError("Error committing the output of task: " + taID.value, e)
committer.abortTask(taskContext)
cmtr.abortTask(taCtxt)
throw e
}
} else {
logInfo("No need to commit output of task: " + taID.value)
logWarning ("No need to commit output of task: " + taID.value)
}
}
def commitJob() {
// always ? Or if cmtr.needsTaskCommit ?
val cmtr = getOutputCommitter()
cmtr.commitJob(getJobContext())
}
// ********* Private Functions *********
private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
if (format == null) {
format = conf.value.getOutputFormat()
.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
}
format
}
private def getOutputCommitter(): OutputCommitter = {
if (committer == null) {
committer = conf.value.getOutputCommitter
}
committer
}
private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
jobContext
}
private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
taskContext
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
jobID = jobId
splitID = splitId
......@@ -143,7 +180,7 @@ private[hive] class SparkHiveWriterContainer(
}
}
private[hive] object SparkHiveWriterContainer {
private[hive] object SparkHiveHadoopWriter {
def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
......@@ -153,65 +190,6 @@ private[hive] object SparkHiveWriterContainer {
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
private[spark] class SparkHiveDynamicPartitionWriterContainer(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
@transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
override protected def initWriters(): Unit = {
// NOTE: This method is executed at the executor side.
// Actual writers are created for each dynamic partition on the fly.
writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
}
override def close(): Unit = {
writers.values.foreach(_.close(false))
commit()
}
override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
val dynamicPartPath = dynamicPartColNames
.zip(row.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
val string = String.valueOf(rawVal)
s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}"
}
.mkString
def newWriter = {
val newFileSinkDesc = new FileSinkDesc(
fileSinkConf.getDirName + dynamicPartPath,
fileSinkConf.getTableInfo,
fileSinkConf.getCompressed)
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
val path = {
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}
HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
fileSinkConf.getTableInfo,
conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
newFileSinkDesc,
path,
Reporter.NULL)
}
writers.getOrElseUpdate(dynamicPartPath, newWriter)
outputPath.makeQualified(fs)
}
}
......@@ -837,6 +837,11 @@ private[hive] object HiveQl {
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
if (partitionKeys.values.exists(p => p.isEmpty)) {
throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
s"dynamic partitioning.")
}
InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
case a: ASTNode =>
......
......@@ -19,25 +19,27 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
import java.util.{HashMap => JHashMap}
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
import org.apache.spark.sql.hive._
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
/**
* :: DeveloperApi ::
......@@ -49,7 +51,7 @@ case class InsertIntoHiveTable(
child: SparkPlan,
overwrite: Boolean)
(@transient sc: HiveContext)
extends UnaryNode with Command {
extends UnaryNode {
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
......@@ -99,74 +101,66 @@ case class InsertIntoHiveTable(
}
def saveAsHiveFile(
rdd: RDD[Row],
rdd: RDD[Writable],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
conf: SerializableWritable[JobConf],
writerContainer: SparkHiveWriterContainer) {
assert(valueClass != null, "Output value class not set")
conf.value.setOutputValueClass(valueClass)
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
assert(outputFileFormatClassName != null, "Output format class not set")
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
val isCompressed = conf.value.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
conf: JobConf,
isCompressed: Boolean) {
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
conf.setOutputValueClass(valueClass)
if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
throw new SparkException("Output format class not set")
}
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
conf.value.set("mapred.output.compress", "true")
conf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type"))
fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
}
conf.value.setOutputCommitter(classOf[FileOutputCommitter])
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(
conf.value,
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
conf,
SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
writerContainer.driverSideSetup()
sc.sparkContext.runJob(rdd, writeToFile _)
writerContainer.commitJob()
// Note that this function is executed on executor side
def writeToFile(context: TaskContext, iterator: Iterator[Row]) {
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
val outputData = new Array[Any](fieldOIs.length)
val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
iterator.foreach { row =>
var i = 0
while (i < fieldOIs.length) {
// TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
outputData(i) = wrap(row(i), fieldOIs(i))
i += 1
}
writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.open()
val writer = writerContainer.getLocalFileWriter(row)
writer.write(serializer.serialize(outputData, standardOI))
var count = 0
while(iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record)
}
writerContainer.close()
writer.close()
writer.commit()
}
sc.sparkContext.runJob(rdd, writeToFile _)
writer.commitJob()
}
override def execute() = result
/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
......@@ -174,57 +168,50 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
private lazy val result: RDD[Row] = {
val childRdd = child.execute()
assert(childRdd != null)
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val rdd = childRdd.mapPartitions { iter =>
val serializer = newSerializer(fileSinkConf.getTableInfo)
val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
val numDynamicPartitions = partition.values.count(_.isEmpty)
val numStaticPartitions = partition.values.count(_.nonEmpty)
val partitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) => key -> ""
}
// All partition column names in the format of "<column name 1>/<column name 2>/..."
val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 &&
sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
val outputData = new Array[Any](fieldOIs.length)
iter.map { row =>
var i = 0
while (i < row.length) {
// Casts Strings to HiveVarchars when necessary.
outputData(i) = wrap(row(i), fieldOIs(i))
i += 1
}
// Report error if any static partition appears after a dynamic partition
val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ =>
throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
serializer.serialize(outputData, standardOI)
}
}
// ORC stores compression information in table properties. While, there are other formats
// (e.g. RCFile) that rely on hadoop configurations to store compression information.
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableWritable(jobConf)
val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
} else {
new SparkHiveWriterContainer(jobConf, fileSinkConf)
}
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
saveAsHiveFile(
rdd,
outputClass,
fileSinkConf,
jobConf,
sc.hiveconf.getBoolean("hive.exec.compress.output", false))
// TODO: Handle dynamic partitioning.
val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
......@@ -233,6 +220,10 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
val partitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) => key -> "" // Should not reach here right now.
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
......@@ -240,26 +231,14 @@ case class InsertIntoHiveTable(
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
)
} else {
db.loadPartition(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
db.loadPartition(
outputPath,
qualifiedTableName,
partitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
} else {
db.loadTable(
outputPath,
......@@ -272,6 +251,6 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
Seq.empty[Row]
sc.sparkContext.makeRDD(Nil, 1)
}
}
......@@ -19,9 +19,6 @@ package org.apache.spark.sql.hive.execution
import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.SparkException
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
......@@ -383,7 +380,7 @@ class HiveQuerySuite extends HiveComparisonTest {
def isExplanation(result: SchemaRDD) = {
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
explanation.contains("== Physical Plan ==")
explanation.exists(_ == "== Physical Plan ==")
}
test("SPARK-1704: Explain commands as a SchemaRDD") {
......@@ -571,91 +568,6 @@ class HiveQuerySuite extends HiveComparisonTest {
case class LogEntry(filename: String, message: String)
case class LogFile(name: String)
createQueryTest("dynamic_partition",
"""
|DROP TABLE IF EXISTS dynamic_part_table;
|CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT);
|
|SET hive.exec.dynamic.partition.mode=nonstrict;
|
|INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
|SELECT 1, 1, 1 FROM src WHERE key=150;
|
|INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
|SELECT 1, NULL, 1 FROM src WHERE key=150;
|
|INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
|SELECT 1, 1, NULL FROM src WHERE key=150;
|
|INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2)
|SELECT 1, NULL, NULL FROM src WHERE key=150;
|
|DROP TABLE IF EXISTS dynamic_part_table;
""".stripMargin)
test("Dynamic partition folder layout") {
sql("DROP TABLE IF EXISTS dynamic_part_table")
sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)")
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
val data = Map(
Seq("1", "1") -> 1,
Seq("1", "NULL") -> 2,
Seq("NULL", "1") -> 3,
Seq("NULL", "NULL") -> 4)
data.foreach { case (parts, value) =>
sql(
s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
|SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150
""".stripMargin)
val partFolder = Seq("partcol1", "partcol2")
.zip(parts)
.map { case (k, v) =>
if (v == "NULL") {
s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}"
} else {
s"$k=$v"
}
}
.mkString("/")
// Loads partition data to a temporary table to verify contents
val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000"
sql("DROP TABLE IF EXISTS dp_verify")
sql("CREATE TABLE dp_verify(intcol INT)")
sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify")
assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value)))
}
}
test("Partition spec validation") {
sql("DROP TABLE IF EXISTS dp_test")
sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)")
sql("SET hive.exec.dynamic.partition.mode=strict")
// Should throw when using strict dynamic partition mode without any static partition
intercept[SparkException] {
sql(
"""INSERT INTO TABLE dp_test PARTITION(dp)
|SELECT key, value, key % 5 FROM src
""".stripMargin)
}
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
// Should throw when a static partition appears after a dynamic partition
intercept[SparkException] {
sql(
"""INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
|SELECT key, value, key % 5 FROM src
""".stripMargin)
}
}
test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
......@@ -713,27 +625,27 @@ class HiveQuerySuite extends HiveComparisonTest {
assert(sql("SET").collect().size == 0)
assertResult(Set(testKey -> testVal)) {
collectResults(sql(s"SET $testKey=$testVal"))
collectResults(hql(s"SET $testKey=$testVal"))
}
assert(hiveconf.get(testKey, "") == testVal)
assertResult(Set(testKey -> testVal)) {
collectResults(sql("SET"))
collectResults(hql("SET"))
}
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
collectResults(sql("SET"))
collectResults(hql("SET"))
}
// "set key"
assertResult(Set(testKey -> testVal)) {
collectResults(sql(s"SET $testKey"))
collectResults(hql(s"SET $testKey"))
}
assertResult(Set(nonexistentKey -> "<undefined>")) {
collectResults(sql(s"SET $nonexistentKey"))
collectResults(hql(s"SET $nonexistentKey"))
}
// Assert that sql() should have the same effects as sql() by repeating the above using sql().
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment