Skip to content
Snippets Groups Projects
Commit e6d1406a authored by jeanlyn's avatar jeanlyn Committed by Michael Armbrust
Browse files

[SPARK-5498][SQL]fix query exception when partition schema does not match table schema

In hive,the schema of partition may be difference from  the table schema.When we use spark-sql to query the data of partition which schema is difference from the table schema,we will get the exceptions as the description of the [jira](https://issues.apache.org/jira/browse/SPARK-5498) .For example:
* We take a look of the schema for the partition and the table

```sql
DESCRIBE partition_test PARTITION (dt='1');
id                  	int              	None
name                	string              	None
dt                  	string              	None

# Partition Information
# col_name            	data_type           	comment

dt                  	string              	None
```
```
DESCRIBE partition_test;
OK
id                  	bigint              	None
name                	string              	None
dt                  	string              	None

# Partition Information
# col_name            	data_type           	comment

dt                  	string              	None
```
*  run the sql
```sql
SELECT * FROM partition_test where dt='1';
```
we will get the cast exception `java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableLong cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt`

Author: jeanlyn <jeanlyn92@gmail.com>

Closes #4289 from jeanlyn/schema and squashes the following commits:

9c8da74 [jeanlyn] fix style
b41d6b9 [jeanlyn] fix compile errors
07d84b6 [jeanlyn] Merge branch 'master' into schema
535b0b6 [jeanlyn] reduce conflicts
d6c93c5 [jeanlyn] fix bug
1e8b30c [jeanlyn] fix code style
0549759 [jeanlyn] fix code style
c879aa1 [jeanlyn] clean the code
2a91a87 [jeanlyn] add more test case and clean the code
12d800d [jeanlyn] fix code style
63d170a [jeanlyn] fix compile problem
7470901 [jeanlyn] reduce conflicts
afc7da5 [jeanlyn] make getConvertedOI compatible between 0.12.0 and 0.13.1
b1527d5 [jeanlyn] fix type mismatch
10744ca [jeanlyn] Insert a space after the start of the comment
3b27af3 [jeanlyn] SPARK-5498:fix bug when query the data when partition schema does not match table schema
parent 8c3b0052
No related branches found
No related tags found
No related merge requests found
......@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
......@@ -116,7 +116,7 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer)
}
deserializedHadoopRDD
......@@ -189,9 +189,13 @@ class HadoopTableReader(
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
// get the table deserializer
val tableSerDe = tableDesc.getDeserializerClass.newInstance()
tableSerDe.initialize(hconf, tableDesc.getProperties)
// fill the non partition key attributes
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs,
mutableRow, tableSerDe)
}
}.toSeq
......@@ -261,25 +265,36 @@ private[hive] object HadoopTableReader extends HiveInspectors {
* Transform all given raw `Writable`s into `Row`s.
*
* @param iterator Iterator of all `Writable`s to be transformed
* @param deserializer The `Deserializer` associated with the input `Writable`
* @param rawDeser The `Deserializer` associated with the input `Writable`
* @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
* positions in the output schema
* @param mutableRow A reusable `MutableRow` that should be filled
* @param tableDeser Table Deserializer
* @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
iterator: Iterator[Writable],
deserializer: Deserializer,
rawDeser: Deserializer,
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {
mutableRow: MutableRow,
tableDeser: Deserializer): Iterator[Row] = {
val soi = if (rawDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
rawDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
} else {
HiveShim.getConvertedOI(
rawDeser.getObjectInspector,
tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
}
val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
// Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
// matching and branching costs per row.
/**
* Builds specific unwrappers ahead of time according to object inspector
* types to avoid pattern matching and branching costs per row.
*/
val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
_.getFieldObjectInspector match {
case oi: BooleanObjectInspector =>
......@@ -316,9 +331,11 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
}
val converter = ObjectInspectorConverters.getConverter(rawDeser.getObjectInspector, soi)
// Map each tuple to a row object
iterator.map { value =>
val raw = deserializer.deserialize(value)
val raw = converter.convert(rawDeser.deserialize(value))
var i = 0
while (i < fieldRefs.length) {
val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
......
......@@ -32,9 +32,12 @@ import org.apache.spark.sql.hive.test.TestHive._
case class TestData(key: Int, value: String)
case class ThreeCloumntable(key: Int, value: String, key1: String)
class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
import org.apache.spark.sql.hive.test.TestHive.implicits._
val testData = TestHive.sparkContext.parallelize(
(1 to 100).map(i => TestData(i, i.toString))).toDF()
......@@ -186,4 +189,43 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
sql("DROP TABLE hiveTableWithStructValue")
}
test("SPARK-5498:partition schema does not match table schema") {
val testData = TestHive.sparkContext.parallelize(
(1 to 10).map(i => TestData(i, i.toString))).toDF()
testData.registerTempTable("testData")
val testDatawithNull = TestHive.sparkContext.parallelize(
(1 to 10).map(i => ThreeCloumntable(i, i.toString,null))).toDF()
val tmpDir = Files.createTempDir()
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
// test schema the same between partition and table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.collect.toSeq
)
// test difference type of field
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.collect.toSeq
)
// add column to table
sql("ALTER TABLE table_with_partition ADD COLUMNS(key1 string)")
checkAnswer(sql("select key,value,key1 from table_with_partition where ds='1' "),
testDatawithNull.collect.toSeq
)
// change column name to table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key keynew BIGINT")
checkAnswer(sql("select keynew,value from table_with_partition where ds='1' "),
testData.collect.toSeq
)
sql("DROP TABLE table_with_partition")
}
}
......@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.stats.StatsSetupConst
import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer, io => hiveIo}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, ObjectInspector, PrimitiveObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, TypeInfoFactory}
......@@ -210,7 +210,7 @@ private[hive] object HiveShim {
def getDataLocationPath(p: Partition) = p.getPartitionPath
def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsForPruner(tbl)
def compatibilityBlackList = Seq(
"decimal_.*",
......@@ -244,6 +244,12 @@ private[hive] object HiveShim {
}
}
def getConvertedOI(
inputOI: ObjectInspector,
outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI, true)
}
def prepareWritable(w: Writable): Writable = {
w
}
......
......@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive
import java.util
import java.util.{ArrayList => JArrayList}
import java.util.Properties
import java.rmi.server.UID
......@@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfo, DecimalTypeInfo, TypeInfoFactory}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{HiveDecimalObjectInspector, PrimitiveObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.{PrimitiveObjectInspector, ObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, PrimitiveObjectInspector, ObjectInspector}
import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
import org.apache.hadoop.hive.serde2.{io => hiveIo}
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable
......@@ -400,7 +401,11 @@ private[hive] object HiveShim {
Decimal(hdoi.getPrimitiveJavaObject(data).bigDecimalValue(), hdoi.precision(), hdoi.scale())
}
}
def getConvertedOI(inputOI: ObjectInspector, outputOI: ObjectInspector): ObjectInspector = {
ObjectInspectorConverters.getConvertedOI(inputOI, outputOI)
}
/*
* Bug introduced in hive-0.13. AvroGenericRecordWritable has a member recordReaderID that
* is needed to initialize before serialization.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment