Skip to content
Snippets Groups Projects
Commit 3f03c90a authored by Wenchen Fan's avatar Wenchen Fan Committed by Reynold Xin
Browse files

[SPARK-18220][SQL] read Hive orc table with varchar column should not fail

## What changes were proposed in this pull request?

Spark SQL only has `StringType`, when reading hive table with varchar column, we will read that column as `StringType`. However, we still need to use varchar `ObjectInspector` to read varchar column in hive table, which means we need to know the actual column type at hive side.

In Spark 2.1, after https://github.com/apache/spark/pull/14363 , we parse hive type string to catalyst type, which means the actual column type at hive side is erased. Then we may use string `ObjectInspector` to read varchar column and fail.

This PR keeps the original hive column type string in the metadata of `StructField`, and use it when we convert it to a hive column.

## How was this patch tested?

newly added regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16060 from cloud-fan/varchar.
parent c24076dc
No related branches found
No related tags found
No related merge requests found
...@@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging { ...@@ -54,6 +54,14 @@ private[spark] object HiveUtils extends Logging {
/** The version of hive used internally by Spark SQL. */ /** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "1.2.1" val hiveExecutionVersion: String = "1.2.1"
/**
* The property key that is used to store the raw hive type string in the metadata of StructField.
* For example, in the case where the Hive type is varchar, the type gets mapped to a string type
* in Spark SQL, but we need to preserve the original type in order to invoke the correct object
* inspector in Hive.
*/
val hiveTypeString: String = "HIVE_TYPE_STRING"
val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version") val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " + .doc("Version of the Hive metastore. Available options are " +
s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.") s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.")
......
...@@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation( ...@@ -61,7 +61,12 @@ private[hive] case class MetastoreRelation(
override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
private def toHiveColumn(c: StructField): FieldSchema = { private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment.orNull) val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment.orNull)
} }
// TODO: merge this with HiveClientImpl#toHiveTable // TODO: merge this with HiveClientImpl#toHiveTable
......
...@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec ...@@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.util.{CircularBuffer, Utils} import org.apache.spark.util.{CircularBuffer, Utils}
/** /**
...@@ -748,7 +749,12 @@ private[hive] class HiveClientImpl( ...@@ -748,7 +749,12 @@ private[hive] class HiveClientImpl(
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
private def toHiveColumn(c: StructField): FieldSchema = { private def toHiveColumn(c: StructField): FieldSchema = {
new FieldSchema(c.name, c.dataType.catalogString, c.getComment().orNull) val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
c.metadata.getString(HiveUtils.hiveTypeString)
} else {
c.dataType.catalogString
}
new FieldSchema(c.name, typeString, c.getComment().orNull)
} }
private def fromHiveColumn(hc: FieldSchema): StructField = { private def fromHiveColumn(hc: FieldSchema): StructField = {
...@@ -758,10 +764,13 @@ private[hive] class HiveClientImpl( ...@@ -758,10 +764,13 @@ private[hive] class HiveClientImpl(
case e: ParseException => case e: ParseException =>
throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) throw new SparkException("Cannot recognize hive type string: " + hc.getType, e)
} }
val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
val field = StructField( val field = StructField(
name = hc.getName, name = hc.getName,
dataType = columnType, dataType = columnType,
nullable = true) nullable = true,
metadata = metadata)
Option(hc.getComment).map(field.withComment).getOrElse(field) Option(hc.getComment).map(field.withComment).getOrElse(field)
} }
......
...@@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest ...@@ -205,7 +205,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
test("make sure we can read table created by old version of Spark") { test("make sure we can read table created by old version of Spark") {
for ((tbl, expectedSchema) <- rawTablesAndExpectations) { for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
val readBack = getTableMetadata(tbl.identifier.table) val readBack = getTableMetadata(tbl.identifier.table)
assert(readBack.schema == expectedSchema) assert(readBack.schema.sameType(expectedSchema))
if (tbl.tableType == CatalogTableType.EXTERNAL) { if (tbl.tableType == CatalogTableType.EXTERNAL) {
// trim the URI prefix // trim the URI prefix
...@@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest ...@@ -235,7 +235,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName") sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
val readBack = getTableMetadata(newName) val readBack = getTableMetadata(newName)
assert(readBack.schema == expectedSchema) assert(readBack.schema.sameType(expectedSchema))
// trim the URI prefix // trim the URI prefix
val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath
......
...@@ -22,6 +22,7 @@ import java.io.File ...@@ -22,6 +22,7 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
...@@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA ...@@ -150,6 +151,17 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE") assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
} }
test("SPARK-18220: read Hive orc table with varchar column") {
val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
try {
hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc")
hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t")
checkAnswer(spark.table("orc_varchar"), Row("a"))
} finally {
hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar")
}
}
} }
class OrcSourceSuite extends OrcSuite { class OrcSourceSuite extends OrcSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment