Skip to content
Snippets Groups Projects
Commit 65b10ffb authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a...

[SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a Hive Table With an Empty Schema

### What changes were proposed in this pull request?
So far, we allow users to create a table with an empty schema: `CREATE TABLE tab1`. This could break many code paths if we enable it. Thus, we should follow Hive to block it.

For Hive serde tables, some serde libraries require the specified schema and record it in the metastore. To get the list, we need to check `hive.serdes.using.metastore.for.schema,` which contains a list of serdes that require user-specified schema. The default values are

- org.apache.hadoop.hive.ql.io.orc.OrcSerde
- org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
- org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe
- org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
- org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
- org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
- org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

### How was this patch tested?
Added test cases for both Hive and data source tables

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16636 from gatorsmile/fixEmptyTableSchema.
parent 317fa750
No related branches found
No related tags found
No related merge requests found
Showing
with 331 additions and 194 deletions
......@@ -110,4 +110,31 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext {
df.select("features").rdd.map { case Row(d: Vector) => d }.first
df.select("features").collect
}
test("create libsvmTable table without schema") {
try {
spark.sql(
s"""
|CREATE TABLE libsvmTable
|USING libsvm
|OPTIONS (
| path '$path'
|)
""".stripMargin)
val df = spark.table("libsvmTable")
assert(df.columns(0) == "label")
assert(df.columns(1) == "features")
} finally {
spark.sql("DROP TABLE IF EXISTS libsvmTable")
}
}
test("create libsvmTable table without schema and path") {
try {
val e = intercept[IOException](spark.sql("CREATE TABLE libsvmTable USING libsvm"))
assert(e.getMessage.contains("No input path specified for libsvm data"))
} finally {
spark.sql("DROP TABLE IF EXISTS libsvmTable")
}
}
}
......@@ -30,9 +30,9 @@ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{AtomicType, StructType}
/**
* Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
* Try to replaces [[UnresolvedRelation]]s if the plan is for direct query on files.
*/
class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
}
......
......@@ -115,7 +115,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new Analyzer(catalog, conf) {
override val extendedResolutionRules =
new FindDataSourceTable(sparkSession) ::
new ResolveDataSource(sparkSession) :: Nil
new ResolveSQLOnFile(sparkSession) :: Nil
override val postHocResolutionRules =
AnalyzeCreateTable(sparkSession) ::
......
......@@ -1511,6 +1511,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
test("create a data source table without schema") {
import testImplicits._
withTempPath { tempDir =>
withTable("tab1", "tab2") {
(("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))
sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'")
checkAnswer(spark.table("tab2"), Row("a", "b"))
}
}
}
test("create table using CLUSTERED BY without schema specification") {
import testImplicits._
withTempPath { tempDir =>
......
......@@ -62,16 +62,16 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.OrcConversions ::
new DetermineHiveSerde(conf) ::
new ResolveHiveSerdeTable(sparkSession) ::
new FindDataSourceTable(sparkSession) ::
new FindHiveSerdeTable(sparkSession) ::
new ResolveDataSource(sparkSession) :: Nil
new ResolveSQLOnFile(sparkSession) :: Nil
override val postHocResolutionRules =
AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
new HiveAnalysis(sparkSession) :: Nil
HiveAnalysis :: Nil
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
......
......@@ -18,7 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
......@@ -27,21 +27,24 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, PreprocessTableInsertion}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.HiveSerDe
/**
* Determine the serde/format of the Hive serde table, according to the storage properties.
* Determine the database, serde/format and schema of the Hive serde table, according to the storage
* properties.
*/
class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) && t.storage.serde.isEmpty =>
if (t.bucketSpec.isDefined) {
class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
private def determineHiveSerde(table: CatalogTable): CatalogTable = {
if (table.storage.serde.nonEmpty) {
table
} else {
if (table.bucketSpec.isDefined) {
throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
}
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
val options = new HiveOptions(t.storage.properties)
val defaultStorage = HiveSerDe.getDefaultStorage(session.sessionState.conf)
val options = new HiveOptions(table.storage.properties)
val fileStorage = if (options.fileFormat.isDefined) {
HiveSerDe.sourceToSerDe(options.fileFormat.get) match {
......@@ -67,13 +70,39 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
CatalogStorageFormat.empty
}
val storage = t.storage.copy(
val storage = table.storage.copy(
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
properties = options.serdeProperties)
c.copy(tableDesc = t.copy(storage = storage))
table.copy(storage = storage)
}
}
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c @ CreateTable(t, _, query) if DDLUtils.isHiveTable(t) =>
// Finds the database name if the name does not exist.
val dbName = t.identifier.database.getOrElse(session.catalog.currentDatabase)
val table = t.copy(identifier = t.identifier.copy(database = Some(dbName)))
// Determines the serde/format of Hive tables
val withStorage = determineHiveSerde(table)
// Infers the schema, if empty, because the schema could be determined by Hive
// serde.
val catalogTable = if (query.isEmpty) {
val withSchema = HiveUtils.inferSchema(withStorage)
if (withSchema.schema.length <= 0) {
throw new AnalysisException("Unable to infer the schema. " +
s"The schema specification is required to create the table ${withSchema.identifier}.")
}
withSchema
} else {
withStorage
}
c.copy(tableDesc = catalogTable)
}
}
......@@ -82,17 +111,13 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
*
* Note that, this rule must be run after `PreprocessTableInsertion`.
*/
class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) =>
InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
CreateHiveTableAsSelectCommand(
tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
query,
mode)
CreateHiveTableAsSelectCommand(tableDesc, query, mode)
}
}
......
......@@ -24,18 +24,25 @@ import java.sql.Timestamp
import java.util.concurrent.TimeUnit
import scala.collection.mutable.HashMap
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf._
......@@ -455,4 +462,133 @@ private[spark] object HiveUtils extends Logging {
case (decimal, DecimalType()) => decimal.toString
case (other, tpe) if primitiveTypes contains tpe => other.toString
}
/** Converts the native StructField to Hive's FieldSchema. */
private def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment.orNull)
}
/** Builds the native StructField from Hive's FieldSchema. */
private def fromHiveColumn(hc: FieldSchema): StructField = {
val columnType = try {
CatalystSqlParser.parseDataType(hc.getType)
} catch {
case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
}
val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val field = StructField(
name = hc.getName,
dataType = columnType,
nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field)
}
// TODO: merge this with HiveClientImpl#toHiveTable
/** Converts the native table metadata representation format CatalogTable to Hive's Table. */
def toHiveTable(catalogTable: CatalogTable): HiveTable = {
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
tTable.setTableName(catalogTable.identifier.table)
tTable.setDbName(catalogTable.database)
val tableParameters = new java.util.HashMap[String, String]()
tTable.setParameters(tableParameters)
catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
tTable.setTableType(catalogTable.tableType match {
case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
})
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tTable.setSd(sd)
// Note: In Hive the schema and partition columns must be disjoint sets
val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)
tTable.setPartitionKeys(partCols.asJava)
catalogTable.storage.locationUri.foreach(sd.setLocation)
catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
sd.setSerdeInfo(serdeInfo)
val serdeParameters = new java.util.HashMap[String, String]()
catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
new HiveTable(tTable)
}
/**
* Converts the native partition metadata representation format CatalogTablePartition to
* Hive's Partition.
*/
def toHivePartition(
catalogTable: CatalogTable,
hiveTable: HiveTable,
partition: CatalogTablePartition): HivePartition = {
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(catalogTable.database)
tPartition.setTableName(catalogTable.identifier.table)
tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
// Note: In Hive the schema and partition columns must be disjoint sets
val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
!catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)
partition.storage.locationUri.foreach(sd.setLocation)
partition.storage.inputFormat.foreach(sd.setInputFormat)
partition.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
sd.setSerdeInfo(serdeInfo)
// maps and lists should be set only after all elements are ready (see HIVE-7975)
partition.storage.serde.foreach(serdeInfo.setSerializationLib)
val serdeParameters = new java.util.HashMap[String, String]()
catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
new HivePartition(hiveTable, tPartition)
}
/**
* Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema.
* When the tables are data source tables or the schema already exists, returns the original
* CatalogTable.
*/
def inferSchema(table: CatalogTable): CatalogTable = {
if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) {
table
} else {
val hiveTable = toHiveTable(table)
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
table.copy(schema = schema)
}
}
}
......@@ -19,13 +19,9 @@ package org.apache.spark.sql.hive
import java.io.IOException
import scala.collection.JavaConverters._
import com.google.common.base.Objects
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
......@@ -60,57 +56,7 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
private def toHiveColumn(c: StructField): FieldSchema = {
val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment.orNull)
}
// TODO: merge this with HiveClientImpl#toHiveTable
@transient val hiveQlTable: HiveTable = {
// We start by constructing an API table as Hive performs several important transformations
// internally when converting an API table to a QL table.
val tTable = new org.apache.hadoop.hive.metastore.api.Table()
tTable.setTableName(catalogTable.identifier.table)
tTable.setDbName(catalogTable.database)
val tableParameters = new java.util.HashMap[String, String]()
tTable.setParameters(tableParameters)
catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) }
tTable.setTableType(catalogTable.tableType match {
case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString
case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
})
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tTable.setSd(sd)
// Note: In Hive the schema and partition columns must be disjoint sets
val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c =>
catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)
tTable.setPartitionKeys(partCols.asJava)
catalogTable.storage.locationUri.foreach(sd.setLocation)
catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
sd.setSerdeInfo(serdeInfo)
val serdeParameters = new java.util.HashMap[String, String]()
catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
new HiveTable(tTable)
}
@transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable)
@transient override def computeStats(conf: CatalystConf): Statistics = {
catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
......@@ -165,38 +111,7 @@ private[hive] case class MetastoreRelation(
} else {
allPartitions
}
rawPartitions.map { p =>
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(databaseName)
tPartition.setTableName(tableName)
tPartition.setValues(partitionKeys.map(a => p.spec(a.name)).asJava)
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
// Note: In Hive the schema and partition columns must be disjoint sets
val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
!catalogTable.partitionColumnNames.contains(c.getName)
}
sd.setCols(schema.asJava)
p.storage.locationUri.foreach(sd.setLocation)
p.storage.inputFormat.foreach(sd.setInputFormat)
p.storage.outputFormat.foreach(sd.setOutputFormat)
val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
sd.setSerdeInfo(serdeInfo)
// maps and lists should be set only after all elements are ready (see HIVE-7975)
p.storage.serde.foreach(serdeInfo.setSerializationLib)
val serdeParameters = new java.util.HashMap[String, String]()
catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
p.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) }
serdeInfo.setParameters(serdeParameters)
new Partition(hiveQlTable, tPartition)
}
rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _))
}
/** Only compare database and tablename, not alias. */
......
......@@ -68,82 +68,4 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl
sql("DROP TABLE IF EXISTS createAndInsertTest")
}
}
test("SPARK-13709: reading partitioned Avro table with nested schema") {
withTempDir { dir =>
val path = dir.toURI.toString
val tableName = "spark_13709"
val tempTableName = "spark_13709_temp"
new File(dir.getAbsolutePath, tableName).mkdir()
new File(dir.getAbsolutePath, tempTableName).mkdir()
val avroSchema =
"""{
| "name": "test_record",
| "type": "record",
| "fields": [ {
| "name": "f0",
| "type": "int"
| }, {
| "name": "f1",
| "type": {
| "type": "record",
| "name": "inner",
| "fields": [ {
| "name": "f10",
| "type": "int"
| }, {
| "name": "f11",
| "type": "double"
| } ]
| }
| } ]
|}
""".stripMargin
withTable(tableName, tempTableName) {
// Creates the external partitioned Avro table to be tested.
sql(
s"""CREATE EXTERNAL TABLE $tableName
|PARTITIONED BY (ds STRING)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$path/$tableName'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)
// Creates an temporary Avro table used to prepare testing Avro file.
sql(
s"""CREATE EXTERNAL TABLE $tempTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$path/$tempTableName'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)
// Generates Avro data.
sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
// Adds generated Avro data as a new partition to the testing table.
sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
// The following query fails before SPARK-13709 is fixed. This is because when reading data
// from table partitions, Avro deserializer needs the Avro schema, which is defined in
// table property "avro.schema.literal". However, we only initializes the deserializer using
// partition properties, which doesn't include the wanted property entry. Merging two sets
// of properties solves the problem.
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Row(1, Row(2, 2.5D), "foo")
)
}
}
}
}
......@@ -24,9 +24,8 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
import org.apache.spark.sql.catalyst.catalog._
......@@ -47,7 +46,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with Logging {
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
......@@ -571,6 +570,85 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
}
}
test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") {
withTempDir { dir =>
val path = dir.toURI.toString
val tableName = "spark_13709"
val tempTableName = "spark_13709_temp"
new File(dir.getAbsolutePath, tableName).mkdir()
new File(dir.getAbsolutePath, tempTableName).mkdir()
val avroSchema =
"""{
| "name": "test_record",
| "type": "record",
| "fields": [ {
| "name": "f0",
| "type": "int"
| }, {
| "name": "f1",
| "type": {
| "type": "record",
| "name": "inner",
| "fields": [ {
| "name": "f10",
| "type": "int"
| }, {
| "name": "f11",
| "type": "double"
| } ]
| }
| } ]
|}
""".stripMargin
withTable(tableName, tempTableName) {
// Creates the external partitioned Avro table to be tested.
sql(
s"""CREATE EXTERNAL TABLE $tableName
|PARTITIONED BY (ds STRING)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$path/$tableName'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)
// Creates an temporary Avro table used to prepare testing Avro file.
sql(
s"""CREATE EXTERNAL TABLE $tempTableName
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
|STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
|LOCATION '$path/$tempTableName'
|TBLPROPERTIES ('avro.schema.literal' = '$avroSchema')
""".stripMargin
)
// Generates Avro data.
sql(s"INSERT OVERWRITE TABLE $tempTableName SELECT 1, STRUCT(2, 2.5)")
// Adds generated Avro data as a new partition to the testing table.
sql(s"ALTER TABLE $tableName ADD PARTITION (ds = 'foo') LOCATION '$path/$tempTableName'")
// The following query fails before SPARK-13709 is fixed. This is because when reading
// data from table partitions, Avro deserializer needs the Avro schema, which is defined
// in table property "avro.schema.literal". However, we only initializes the deserializer
// using partition properties, which doesn't include the wanted property entry. Merging
// two sets of properties solves the problem.
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Row(1, Row(2, 2.5D), "foo")
)
}
}
}
// TODO: add more tests.
}
}
......@@ -79,6 +79,25 @@ class HiveDDLSuite
}
}
test("create a hive table without schema") {
import testImplicits._
withTempPath { tempDir =>
withTable("tab1", "tab2") {
(("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
var e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING hive") }.getMessage
assert(e.contains("Unable to infer the schema. The schema specification is required to " +
"create the table `default`.`tab1`"))
e = intercept[AnalysisException] {
sql(s"CREATE TABLE tab2 location '${tempDir.getCanonicalPath}'")
}.getMessage
assert(e.contains("Unable to infer the schema. The schema specification is required to " +
"create the table `default`.`tab2`"))
}
}
}
test("drop external tables in default database") {
withTempDir { tmpDir =>
val tabName = "tab1"
......@@ -199,7 +218,7 @@ class HiveDDLSuite
val e = intercept[AnalysisException] {
sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
}
assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
assert(e.message == "Found duplicate column(s) in table definition of `default`.`tbl`: a")
}
test("add/drop partition with location - managed table") {
......@@ -1192,7 +1211,7 @@ class HiveDDLSuite
assert(e2.getMessage.contains(forbiddenPrefix + "foo"))
val e3 = intercept[AnalysisException] {
sql(s"CREATE TABLE tbl TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
}
assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment