Skip to content
Snippets Groups Projects
Commit 2caaed97 authored by gatorsmile's avatar gatorsmile
Browse files

[SPARK-21767][TEST][SQL] Add Decimal Test For Avro in VersionSuite

## What changes were proposed in this pull request?
Decimal is a logical type of AVRO. We need to ensure the support of Hive's AVRO serde works well in Spark

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #18977 from gatorsmile/addAvroTest.
parent 7ab95188
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
......@@ -697,6 +696,73 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(versionSpark.table("t1").collect() === Array(Row(2)))
}
}
test(s"$version: Decimal support of Avro Hive serde") {
val tableName = "tab1"
// TODO: add the other logical types. For details, see the link:
// https://avro.apache.org/docs/1.8.1/spec.html#Logical+Types
val avroSchema =
"""{
| "name": "test_record",
| "type": "record",
| "fields": [ {
| "name": "f0",
| "type": [
| "null",
| {
| "precision": 38,
| "scale": 2,
| "type": "bytes",
| "logicalType": "decimal"
| }
| ]
| } ]
|}
""".stripMargin
Seq(true, false).foreach { isPartitioned =>
withTable(tableName) {
val partitionClause = if (isPartitioned) "PARTITIONED BY (ds STRING)" else ""
// Creates the (non-)partitioned Avro table
versionSpark.sql(
s"""
|CREATE TABLE $tableName
|$partitionClause
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)
val errorMsg = "data type mismatch: cannot cast DecimalType(2,1) to BinaryType"
if (isPartitioned) {
val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3"
if (version == "0.12" || version == "0.13") {
val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage
assert(e.contains(errorMsg))
} else {
versionSpark.sql(insertStmt)
assert(versionSpark.table(tableName).collect() ===
versionSpark.sql("SELECT 1.30, 'a'").collect())
}
} else {
val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 1.3"
if (version == "0.12" || version == "0.13") {
val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage
assert(e.contains(errorMsg))
} else {
versionSpark.sql(insertStmt)
assert(versionSpark.table(tableName).collect() ===
versionSpark.sql("SELECT 1.30").collect())
}
}
}
}
}
// TODO: add more tests.
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment