diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 82fbdd645ebe00bedadb8d1425ee0b0ceb67b3b3..072e538b9ed546b45e076f5bbbf5e99cff293a38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, File, PrintStream} import java.net.URI import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat @@ -697,6 +696,73 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(versionSpark.table("t1").collect() === Array(Row(2))) } } + + test(s"$version: Decimal support of Avro Hive serde") { + val tableName = "tab1" + // TODO: add the other logical types. For details, see the link: + // https://avro.apache.org/docs/1.8.1/spec.html#Logical+Types + val avroSchema = + """{ + | "name": "test_record", + | "type": "record", + | "fields": [ { + | "name": "f0", + | "type": [ + | "null", + | { + | "precision": 38, + | "scale": 2, + | "type": "bytes", + | "logicalType": "decimal" + | } + | ] + | } ] + |} + """.stripMargin + + Seq(true, false).foreach { isPartitioned => + withTable(tableName) { + val partitionClause = if (isPartitioned) "PARTITIONED BY (ds STRING)" else "" + // Creates the (non-)partitioned Avro table + versionSpark.sql( + s""" + |CREATE TABLE $tableName + |$partitionClause + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' + |STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' + |TBLPROPERTIES ('avro.schema.literal' = '$avroSchema') + """.stripMargin + ) + + val errorMsg = "data type mismatch: cannot cast DecimalType(2,1) to BinaryType" + + if (isPartitioned) { + val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" + if (version == "0.12" || version == "0.13") { + val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage + assert(e.contains(errorMsg)) + } else { + versionSpark.sql(insertStmt) + assert(versionSpark.table(tableName).collect() === + versionSpark.sql("SELECT 1.30, 'a'").collect()) + } + } else { + val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 1.3" + if (version == "0.12" || version == "0.13") { + val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage + assert(e.contains(errorMsg)) + } else { + versionSpark.sql(insertStmt) + assert(versionSpark.table(tableName).collect() === + versionSpark.sql("SELECT 1.30").collect()) + } + } + } + } + } + // TODO: add more tests. } }