Skip to content
Snippets Groups Projects
Commit e7033572 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-3810][SQL] Makes PreInsertionCasts handle partitions properly

Includes partition keys into account when applying `PreInsertionCasts` rule.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2672 from liancheng/fix-pre-insert-casts and squashes the following commits:

def1a1a [Cheng Lian] Makes PreInsertionCasts handle partitions properly
parent 4ec93195
No related branches found
No related tags found
No related merge requests found
...@@ -19,31 +19,28 @@ package org.apache.spark.sql.hive ...@@ -19,31 +19,28 @@ package org.apache.spark.sql.hive
import scala.util.parsing.combinator.RegexParsers import scala.util.parsing.combinator.RegexParsers
import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo} import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, Catalog} import org.apache.spark.sql.catalyst.analysis.Catalog
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar.InMemoryRelation
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.util.Utils import org.apache.spark.util.Utils
/* Implicit conversions */ /* Implicit conversions */
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
import HiveMetastoreTypes._ import org.apache.spark.sql.hive.HiveMetastoreTypes._
/** Connection to hive metastore. Usages should lock on `this`. */ /** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf) protected[hive] val client = Hive.get(hive.hiveconf)
...@@ -137,10 +134,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with ...@@ -137,10 +134,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = { def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
val childOutputDataTypes = child.output.map(_.dataType) val childOutputDataTypes = child.output.map(_.dataType)
// Only check attributes, not partitionKeys since they are always strings.
// TODO: Fully support inserting into partitioned tables.
val tableOutputDataTypes = val tableOutputDataTypes =
table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType) (table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType)
if (childOutputDataTypes == tableOutputDataTypes) { if (childOutputDataTypes == tableOutputDataTypes) {
p p
......
...@@ -22,6 +22,7 @@ import scala.util.Try ...@@ -22,6 +22,7 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.SparkException import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive._
...@@ -675,6 +676,41 @@ class HiveQuerySuite extends HiveComparisonTest { ...@@ -675,6 +676,41 @@ class HiveQuerySuite extends HiveComparisonTest {
sql("SELECT * FROM boom").queryExecution.analyzed sql("SELECT * FROM boom").queryExecution.analyzed
} }
test("SPARK-3810: PreInsertionCasts static partitioning support") {
val analyzedPlan = {
loadTestTable("srcpart")
sql("DROP TABLE IF EXISTS withparts")
sql("CREATE TABLE withparts LIKE srcpart")
sql("INSERT INTO TABLE withparts PARTITION(ds='1', hr='2') SELECT key, value FROM src")
.queryExecution.analyzed
}
assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
case _: Project => ()
}.size
}
}
test("SPARK-3810: PreInsertionCasts dynamic partitioning support") {
val analyzedPlan = {
loadTestTable("srcpart")
sql("DROP TABLE IF EXISTS withparts")
sql("CREATE TABLE withparts LIKE srcpart")
sql("SET hive.exec.dynamic.partition.mode=nonstrict")
sql("CREATE TABLE IF NOT EXISTS withparts LIKE srcpart")
sql("INSERT INTO TABLE withparts PARTITION(ds, hr) SELECT key, value FROM src")
.queryExecution.analyzed
}
assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
case _: Project => ()
}.size
}
}
test("parse HQL set commands") { test("parse HQL set commands") {
// Adapted from its SQL counterpart. // Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly" val testKey = "spark.sql.key.usedfortestonly"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment