diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 06f0f5b67f220e3eaef4a143a0b2bfc480dd17a3..a3b39a8540656eb61259bd7c6310120cf81e38af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -76,7 +76,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) { - visit(ctx.dataType).asInstanceOf[DataType] + visitSparkDataType(ctx.dataType) } /* ******************************************************************************************** @@ -997,7 +997,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { * Create a [[Cast]] expression. */ override def visitCast(ctx: CastContext): Expression = withOrigin(ctx) { - Cast(expression(ctx.expression), typedVisit(ctx.dataType)) + Cast(expression(ctx.expression), visitSparkDataType(ctx.dataType)) } /** @@ -1415,6 +1415,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { /* ******************************************************************************************** * DataType parsing * ******************************************************************************************** */ + /** + * Create a Spark DataType. + */ + private def visitSparkDataType(ctx: DataTypeContext): DataType = { + HiveStringType.replaceCharType(typedVisit(ctx)) + } + /** * Resolve/create a primitive type. */ @@ -1429,8 +1436,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { case ("double", Nil) => DoubleType case ("date", Nil) => DateType case ("timestamp", Nil) => TimestampType - case ("char" | "varchar" | "string", Nil) => StringType - case ("char" | "varchar", _ :: Nil) => StringType + case ("string", Nil) => StringType + case ("char", length :: Nil) => CharType(length.getText.toInt) + case ("varchar", length :: Nil) => VarcharType(length.getText.toInt) case ("binary", Nil) => BinaryType case ("decimal", Nil) => DecimalType.USER_DEFAULT case ("decimal", precision :: Nil) => DecimalType(precision.getText.toInt, 0) @@ -1452,7 +1460,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { case SqlBaseParser.MAP => MapType(typedVisit(ctx.dataType(0)), typedVisit(ctx.dataType(1))) case SqlBaseParser.STRUCT => - createStructType(ctx.complexColTypeList()) + StructType(Option(ctx.complexColTypeList).toSeq.flatMap(visitComplexColTypeList)) } } @@ -1471,12 +1479,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } /** - * Create a [[StructField]] from a column definition. + * Create a top level [[StructField]] from a column definition. */ override def visitColType(ctx: ColTypeContext): StructField = withOrigin(ctx) { import ctx._ - val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true) - if (STRING == null) structField else structField.withComment(string(STRING)) + + val builder = new MetadataBuilder + // Add comment to metadata + if (STRING != null) { + builder.putString("comment", string(STRING)) + } + // Add Hive type string to metadata. + val rawDataType = typedVisit[DataType](ctx.dataType) + val cleanedDataType = HiveStringType.replaceCharType(rawDataType) + if (rawDataType != cleanedDataType) { + builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString) + } + + StructField( + identifier.getText, + cleanedDataType, + nullable = true, + builder.build()) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala new file mode 100644 index 0000000000000000000000000000000000000000..b319eb70bc13c9e01412fa17fb8885e27178f234 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/HiveStringType.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.types + +import scala.math.Ordering +import scala.reflect.runtime.universe.typeTag + +import org.apache.spark.sql.catalyst.ScalaReflectionLock +import org.apache.spark.unsafe.types.UTF8String + +/** + * A hive string type for compatibility. These datatypes should only used for parsing, + * and should NOT be used anywhere else. Any instance of these data types should be + * replaced by a [[StringType]] before analysis. + */ +sealed abstract class HiveStringType extends AtomicType { + private[sql] type InternalType = UTF8String + + private[sql] val ordering = implicitly[Ordering[InternalType]] + + @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { + typeTag[InternalType] + } + + override def defaultSize: Int = length + + private[spark] override def asNullable: HiveStringType = this + + def length: Int +} + +object HiveStringType { + def replaceCharType(dt: DataType): DataType = dt match { + case ArrayType(et, nullable) => + ArrayType(replaceCharType(et), nullable) + case MapType(kt, vt, nullable) => + MapType(replaceCharType(kt), replaceCharType(vt), nullable) + case StructType(fields) => + StructType(fields.map { field => + field.copy(dataType = replaceCharType(field.dataType)) + }) + case _: HiveStringType => StringType + case _ => dt + } +} + +/** + * Hive char type. + */ +case class CharType(length: Int) extends HiveStringType { + override def simpleString: String = s"char($length)" +} + +/** + * Hive varchar type. + */ +case class VarcharType(length: Int) extends HiveStringType { + override def simpleString: String = s"varchar($length)" +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala index 346a51ea10c8284dd0fa165c7c123e6f74fbf06f..f29cbc2069e3970e51774a3e42be5760eca312fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala @@ -21,4 +21,12 @@ package org.apache.spark.sql * Contains a type system for attributes produced by relations, including complex types like * structs, arrays and maps. */ -package object types +package object types { + /** + * Metadata key used to store the raw hive type string in the metadata of StructField. This + * is relevant for datatypes that do not have a direct Spark SQL counterpart, such as CHAR and + * VARCHAR. We need to preserve the original type in order to invoke the correct object + * inspector in Hive. + */ + val HIVE_TYPE_STRING = "HIVE_TYPE_STRING" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 86bcb4d4b00c1299a084b6880f3ef9b7b5943692..eaa5fb30edfa5d853822cb30ec8819cc6cb000b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -203,6 +203,9 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { (2 to 10).map(i => Row(i, i - 1)).toSeq) test("Schema and all fields") { + def hiveMetadata(dt: String): Metadata = { + new MetadataBuilder().putString(HIVE_TYPE_STRING, dt).build() + } val expectedSchema = StructType( StructField("string$%Field", StringType, true) :: StructField("binaryField", BinaryType, true) :: @@ -217,8 +220,8 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { StructField("decimalField2", DecimalType(9, 2), true) :: StructField("dateField", DateType, true) :: StructField("timestampField", TimestampType, true) :: - StructField("varcharField", StringType, true) :: - StructField("charField", StringType, true) :: + StructField("varcharField", StringType, true, hiveMetadata("varchar(12)")) :: + StructField("charField", StringType, true, hiveMetadata("char(18)")) :: StructField("arrayFieldSimple", ArrayType(IntegerType), true) :: StructField("arrayFieldComplex", ArrayType( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 26b1994308f5d2326587d68068b6c263fb80614f..81cd65c3cc337b98279425cbfcaa5e67aecc2759 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -54,14 +54,6 @@ private[spark] object HiveUtils extends Logging { /** The version of hive used internally by Spark SQL. */ val hiveExecutionVersion: String = "1.2.1" - /** - * The property key that is used to store the raw hive type string in the metadata of StructField. - * For example, in the case where the Hive type is varchar, the type gets mapped to a string type - * in Spark SQL, but we need to preserve the original type in order to invoke the correct object - * inspector in Hive. - */ - val hiveTypeString: String = "HIVE_TYPE_STRING" - val HIVE_METASTORE_VERSION = SQLConfigBuilder("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + s"<code>0.12.0</code> through <code>$hiveExecutionVersion</code>.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 3bbac05a79c234dacaaedec95936a72f7cda3491..8f40a59fc15e9dbe35073a677455956bbccebd5a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -35,8 +35,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types._ private[hive] case class MetastoreRelation( @@ -61,8 +60,8 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil private def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { - c.metadata.getString(HiveUtils.hiveTypeString) + val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { + c.metadata.getString(HIVE_TYPE_STRING) } else { c.dataType.catalogString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a9ca1a42495137150674a99e96c47a1b3ea2cb8f..9b3f29970e8ad01691a6a816adf64d9f98fb26a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} /** @@ -777,8 +777,8 @@ private[hive] class HiveClientImpl( .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] private def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) { - c.metadata.getString(HiveUtils.hiveTypeString) + val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { + c.metadata.getString(HIVE_TYPE_STRING) } else { c.dataType.catalogString } @@ -793,7 +793,7 @@ private[hive] class HiveClientImpl( throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) } - val metadata = new MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build() + val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build() val field = StructField( name = hc.getName, dataType = columnType, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 2b404690510cdd1fd225e92a596d1ee31a47a71a..aa60a3fd4f477d19e50a33e6e423ca4aa9ab61b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -154,12 +154,43 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA test("SPARK-18220: read Hive orc table with varchar column") { val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + val location = Utils.createTempDir() + val uri = location.toURI try { - hiveClient.runSqlHive("CREATE TABLE orc_varchar(a VARCHAR(10)) STORED AS orc") - hiveClient.runSqlHive("INSERT INTO TABLE orc_varchar SELECT 'a' FROM (SELECT 1) t") - checkAnswer(spark.table("orc_varchar"), Row("a")) + hiveClient.runSqlHive( + """ + |CREATE EXTERNAL TABLE hive_orc( + | a STRING, + | b CHAR(10), + | c VARCHAR(10), + | d ARRAY<CHAR(3)>) + |STORED AS orc""".stripMargin) + // Hive throws an exception if I assign the location in the create table statement. + hiveClient.runSqlHive( + s"ALTER TABLE hive_orc SET LOCATION '$uri'") + hiveClient.runSqlHive( + """INSERT INTO TABLE hive_orc + |SELECT 'a', 'b', 'c', ARRAY(CAST('d' AS CHAR(3))) + |FROM (SELECT 1) t""".stripMargin) + + // We create a different table in Spark using the same schema which points to + // the same location. + spark.sql( + s""" + |CREATE EXTERNAL TABLE spark_orc( + | a STRING, + | b CHAR(10), + | c VARCHAR(10), + | d ARRAY<CHAR(3)>) + |STORED AS orc + |LOCATION '$uri'""".stripMargin) + val result = Row("a", "b ", "c", Seq("d ")) + checkAnswer(spark.table("hive_orc"), result) + checkAnswer(spark.table("spark_orc"), result) } finally { - hiveClient.runSqlHive("DROP TABLE IF EXISTS orc_varchar") + hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc") + hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc") + Utils.deleteRecursively(location) } } }