From 3881f342b49efdb1e0d5ee27f616451ea1928c5d Mon Sep 17 00:00:00 2001 From: windpiger <songjun@outlook.com> Date: Sat, 11 Feb 2017 22:21:14 -0800 Subject: [PATCH] [SPARK-19448][SQL] optimize some duplication functions between HiveClientImpl and HiveUtils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? There are some duplicate functions between `HiveClientImpl` and `HiveUtils`, we can merge them to one place. such as: `toHiveTable` ã€`toHivePartition`ã€`fromHivePartition`. And additional modify is change `MetastoreRelation.attributes` to `MetastoreRelation.dataColKeys` https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala#L234 ## How was this patch tested? N/A Author: windpiger <songjun@outlook.com> Closes #16787 from windpiger/todoInMetaStoreRelation. --- .../org/apache/spark/sql/hive/HiveUtils.scala | 127 +----------------- .../spark/sql/hive/MetastoreRelation.scala | 13 +- .../sql/hive/client/HiveClientImpl.scala | 70 ++++++---- .../hive/execution/HiveTableScanExec.scala | 2 +- ...nalCatalogBackwardCompatibilitySuite.scala | 5 + .../sql/hive/MetastoreDataSourcesSuite.scala | 3 + .../spark/sql/hive/client/VersionsSuite.scala | 25 +++- 7 files changed, 88 insertions(+), 157 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 13ab4e88e8..afc2bf8533 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -31,17 +31,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, CatalogTableType} -import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.internal.SQLConf @@ -455,117 +451,6 @@ private[spark] object HiveUtils extends Logging { case (other, tpe) if primitiveTypes contains tpe => other.toString } - /** Converts the native StructField to Hive's FieldSchema. */ - private def toHiveColumn(c: StructField): FieldSchema = { - val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { - c.metadata.getString(HIVE_TYPE_STRING) - } else { - c.dataType.catalogString - } - new FieldSchema(c.name, typeString, c.getComment.orNull) - } - - /** Builds the native StructField from Hive's FieldSchema. */ - private def fromHiveColumn(hc: FieldSchema): StructField = { - val columnType = try { - CatalystSqlParser.parseDataType(hc.getType) - } catch { - case e: ParseException => - throw new SparkException("Cannot recognize hive type string: " + hc.getType, e) - } - - val metadata = new MetadataBuilder().putString(HIVE_TYPE_STRING, hc.getType).build() - val field = StructField( - name = hc.getName, - dataType = columnType, - nullable = true, - metadata = metadata) - Option(hc.getComment).map(field.withComment).getOrElse(field) - } - - // TODO: merge this with HiveClientImpl#toHiveTable - /** Converts the native table metadata representation format CatalogTable to Hive's Table. */ - def toHiveTable(catalogTable: CatalogTable): HiveTable = { - // We start by constructing an API table as Hive performs several important transformations - // internally when converting an API table to a QL table. - val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(catalogTable.identifier.table) - tTable.setDbName(catalogTable.database) - - val tableParameters = new java.util.HashMap[String, String]() - tTable.setParameters(tableParameters) - catalogTable.properties.foreach { case (k, v) => tableParameters.put(k, v) } - - tTable.setTableType(catalogTable.tableType match { - case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE.toString - case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString - case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString - }) - - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tTable.setSd(sd) - - // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = catalogTable.schema.map(toHiveColumn).partition { c => - catalogTable.partitionColumnNames.contains(c.getName) - } - sd.setCols(schema.asJava) - tTable.setPartitionKeys(partCols.asJava) - - catalogTable.storage.locationUri.foreach(sd.setLocation) - catalogTable.storage.inputFormat.foreach(sd.setInputFormat) - catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib) - sd.setSerdeInfo(serdeInfo) - - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - - new HiveTable(tTable) - } - - /** - * Converts the native partition metadata representation format CatalogTablePartition to - * Hive's Partition. - */ - def toHivePartition( - catalogTable: CatalogTable, - hiveTable: HiveTable, - partition: CatalogTablePartition): HivePartition = { - val tPartition = new org.apache.hadoop.hive.metastore.api.Partition - tPartition.setDbName(catalogTable.database) - tPartition.setTableName(catalogTable.identifier.table) - tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava) - - val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() - tPartition.setSd(sd) - - // Note: In Hive the schema and partition columns must be disjoint sets - val schema = catalogTable.schema.map(toHiveColumn).filter { c => - !catalogTable.partitionColumnNames.contains(c.getName) - } - sd.setCols(schema.asJava) - - partition.storage.locationUri.foreach(sd.setLocation) - partition.storage.inputFormat.foreach(sd.setInputFormat) - partition.storage.outputFormat.foreach(sd.setOutputFormat) - - val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo - sd.setSerdeInfo(serdeInfo) - // maps and lists should be set only after all elements are ready (see HIVE-7975) - partition.storage.serde.foreach(serdeInfo.setSerializationLib) - - val serdeParameters = new java.util.HashMap[String, String]() - catalogTable.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } - partition.storage.properties.foreach { case (k, v) => serdeParameters.put(k, v) } - serdeInfo.setParameters(serdeParameters) - - new HivePartition(hiveTable, tPartition) - } - /** * Infers the schema for Hive serde tables and returns the CatalogTable with the inferred schema. * When the tables are data source tables or the schema already exists, returns the original @@ -575,12 +460,12 @@ private[spark] object HiveUtils extends Logging { if (DDLUtils.isDatasourceTable(table) || table.dataSchema.nonEmpty) { table } else { - val hiveTable = toHiveTable(table) + val hiveTable = HiveClientImpl.toHiveTable(table) // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema - val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn) - val schema = StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols) - table.copy(schema = schema) + val partCols = hiveTable.getPartCols.asScala.map(HiveClientImpl.fromHiveColumn) + val dataCols = hiveTable.getCols.asScala.map(HiveClientImpl.fromHiveColumn) + table.copy(schema = StructType(dataCols ++ partCols)) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 6394eb6da5..97b120758b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types.StructField @@ -56,7 +57,7 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - @transient val hiveQlTable: HiveTable = HiveUtils.toHiveTable(catalogTable) + @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) @transient override def computeStats(conf: CatalystConf): Statistics = { catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( @@ -111,7 +112,8 @@ private[hive] case class MetastoreRelation( } else { allPartitions } - rawPartitions.map(HiveUtils.toHivePartition(catalogTable, hiveQlTable, _)) + + rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) } /** Only compare database and tablename, not alias. */ @@ -146,18 +148,17 @@ private[hive] case class MetastoreRelation( val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute) /** Non-partitionKey attributes */ - // TODO: just make this hold the schema itself, not just non-partition columns - val attributes = catalogTable.schema + val dataColKeys = catalogTable.schema .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } .map(_.toAttribute) - val output = attributes ++ partitionKeys + val output = dataColKeys ++ partitionKeys /** An attribute map that can be used to lookup original attributes based on expression id. */ val attributeMap = AttributeMap(output.map(o => (o, o))) /** An attribute map for determining the ordinal for non-partition columns. */ - val columnOrdinals = AttributeMap(attributes.zipWithIndex) + val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex) override def inputFiles: Array[String] = { val partLocations = allPartitions diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f0d01ebfcf..dc9c3ff335 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,7 +46,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -435,7 +436,7 @@ private[hive] class HiveClientImpl( } override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState { - client.createTable(toHiveTable(table), ignoreIfExists) + client.createTable(toHiveTable(table, Some(conf)), ignoreIfExists) } override def dropTable( @@ -447,7 +448,7 @@ private[hive] class HiveClientImpl( } override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(conf)) // Do not use `table.qualifiedName` here because this may be a rename val qualifiedTableName = s"${table.database}.$tableName" client.alterTable(qualifiedTableName, hiveTable) @@ -516,7 +517,7 @@ private[hive] class HiveClientImpl( newSpecs: Seq[TablePartitionSpec]): Unit = withHiveState { require(specs.size == newSpecs.size, "number of old and new partition specs differ") val catalogTable = getTable(db, table) - val hiveTable = toHiveTable(catalogTable) + val hiveTable = toHiveTable(catalogTable, Some(conf)) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val hivePart = getPartitionOption(catalogTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } @@ -529,7 +530,7 @@ private[hive] class HiveClientImpl( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withHiveState { - val hiveTable = toHiveTable(getTable(db, table)) + val hiveTable = toHiveTable(getTable(db, table), Some(conf)) client.alterPartitions(table, newParts.map { p => toHivePartition(p, hiveTable) }.asJava) } @@ -557,7 +558,7 @@ private[hive] class HiveClientImpl( override def getPartitionOption( table: CatalogTable, spec: TablePartitionSpec): Option[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(conf)) val hivePartition = client.getPartition(hiveTable, spec.asJava, false) Option(hivePartition).map(fromHivePartition) } @@ -569,7 +570,7 @@ private[hive] class HiveClientImpl( override def getPartitions( table: CatalogTable, spec: Option[TablePartitionSpec]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(conf)) val parts = spec match { case None => shim.getAllPartitions(client, hiveTable).map(fromHivePartition) case Some(s) => @@ -583,7 +584,7 @@ private[hive] class HiveClientImpl( override def getPartitionsByFilter( table: CatalogTable, predicates: Seq[Expression]): Seq[CatalogTablePartition] = withHiveState { - val hiveTable = toHiveTable(table) + val hiveTable = toHiveTable(table, Some(conf)) val parts = shim.getPartitionsByFilter(client, hiveTable, predicates).map(fromHivePartition) HiveCatalogMetrics.incrementFetchedPartitions(parts.length) parts @@ -776,20 +777,11 @@ private[hive] class HiveClientImpl( client.dropDatabase(db, true, false, true) } } +} - - /* -------------------------------------------------------- * - | Helper methods for converting to and from Hive classes | - * -------------------------------------------------------- */ - - private def toInputFormat(name: String) = - Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] - - private def toOutputFormat(name: String) = - Utils.classForName(name) - .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] - - private def toHiveColumn(c: StructField): FieldSchema = { +private[hive] object HiveClientImpl { + /** Converts the native StructField to Hive's FieldSchema. */ + def toHiveColumn(c: StructField): FieldSchema = { val typeString = if (c.metadata.contains(HIVE_TYPE_STRING)) { c.metadata.getString(HIVE_TYPE_STRING) } else { @@ -798,7 +790,8 @@ private[hive] class HiveClientImpl( new FieldSchema(c.name, typeString, c.getComment().orNull) } - private def fromHiveColumn(hc: FieldSchema): StructField = { + /** Builds the native StructField from Hive's FieldSchema. */ + def fromHiveColumn(hc: FieldSchema): StructField = { val columnType = try { CatalystSqlParser.parseDataType(hc.getType) } catch { @@ -815,7 +808,19 @@ private[hive] class HiveClientImpl( Option(hc.getComment).map(field.withComment).getOrElse(field) } - private def toHiveTable(table: CatalogTable): HiveTable = { + private def toInputFormat(name: String) = + Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]] + + private def toOutputFormat(name: String) = + Utils.classForName(name) + .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]] + + /** + * Converts the native table metadata representation format CatalogTable to Hive's Table. + */ + def toHiveTable( + table: CatalogTable, + conf: Option[HiveConf] = None): HiveTable = { val hiveTable = new HiveTable(table.database, table.identifier.table) // For EXTERNAL_TABLE, we also need to set EXTERNAL field in the table properties. // Otherwise, Hive metastore will change the table to a MANAGED_TABLE. @@ -832,7 +837,9 @@ private[hive] class HiveClientImpl( val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - if (schema.isEmpty) { + // after SPARK-19279, it is not allowed to create a hive table with an empty schema, + // so here we should not add a default col schema + if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) { // This is a hack to preserve existing behavior. Before Spark 2.0, we do not // set a default serde here (this was done in Hive), and so if the user provides // an empty schema Hive would automatically populate the schema with a single @@ -845,10 +852,10 @@ private[hive] class HiveClientImpl( hiveTable.setFields(schema.asJava) } hiveTable.setPartCols(partCols.asJava) - hiveTable.setOwner(conf.getUser) + conf.foreach(c => hiveTable.setOwner(c.getUser)) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.locationUri.foreach { loc => hiveTable.getTTable.getSd.setLocation(loc)} table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( @@ -866,7 +873,11 @@ private[hive] class HiveClientImpl( hiveTable } - private def toHivePartition( + /** + * Converts the native partition metadata representation format CatalogTablePartition to + * Hive's Partition. + */ + def toHivePartition( p: CatalogTablePartition, ht: HiveTable): HivePartition = { val tpart = new org.apache.hadoop.hive.metastore.api.Partition @@ -891,7 +902,10 @@ private[hive] class HiveClientImpl( new HivePartition(ht, tpart) } - private def fromHivePartition(hp: HivePartition): CatalogTablePartition = { + /** + * Build the native partition metadata from Hive's Partition. + */ + def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index def6ef3691..140c352fa6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -113,7 +113,7 @@ case class HiveTableScanExec( .mkString(",") hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(",")) } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index 00fdfbcebb..ee632d24b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -134,6 +134,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest storage = CatalogStorageFormat.empty.copy( properties = Map("path" -> defaultTableURI("tbl4").toString)), schema = new StructType(), + provider = Some("json"), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema.numParts" -> "1", @@ -145,6 +146,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest storage = CatalogStorageFormat.empty.copy( properties = Map("path" -> defaultTableURI("tbl5").toString)), schema = simpleSchema, + provider = Some("parquet"), properties = Map( "spark.sql.sources.provider" -> "parquet", "spark.sql.sources.schema.numParts" -> "1", @@ -156,6 +158,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest storage = CatalogStorageFormat.empty.copy( properties = Map("path" -> defaultTableURI("tbl6").toString)), schema = new StructType(), + provider = Some("json"), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema.numParts" -> "1", @@ -170,6 +173,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"), properties = Map("path" -> tempDirUri)), schema = new StructType(), + provider = Some("json"), properties = Map( "spark.sql.sources.provider" -> "json", "spark.sql.sources.schema.numParts" -> "1", @@ -194,6 +198,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"), properties = Map("path" -> tempDirUri)), schema = new StructType(), + provider = Some("json"), properties = Map("spark.sql.sources.provider" -> "json")) // A list of all raw tables we want to test, with their expected schema. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index cf1fe2bc70..e951bbe1dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -748,6 +748,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED, schema = new StructType, + provider = Some("json"), storage = CatalogStorageFormat( locationUri = None, inputFormat = None, @@ -1276,6 +1277,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier("t", Some("default")), tableType = CatalogTableType.MANAGED, schema = new StructType, + provider = Some("json"), storage = CatalogStorageFormat.empty, properties = Map( DATASOURCE_PROVIDER -> "json", @@ -1373,6 +1375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv properties = Map("path" -> path.getAbsolutePath) ), schema = new StructType(), + provider = Some("parquet"), properties = Map( HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet")) hiveClient.createTable(tableDesc, ignoreIfExists = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index ca39c7e845..fe14824cf0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client import java.io.{ByteArrayOutputStream, File, PrintStream} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat @@ -570,7 +571,6 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w } } - test(s"$version: SPARK-13709: reading partitioned Avro table with nested schema") { withTempDir { dir => val path = dir.toURI.toString @@ -649,6 +649,29 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w } } + test(s"$version: CTAS for managed data source tables") { + withTable("t", "t1") { + import spark.implicits._ + + val tPath = new Path(spark.sessionState.conf.warehousePath, "t") + Seq("1").toDF("a").write.saveAsTable("t") + val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}" + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + + assert(table.location.stripSuffix("/") == expectedPath) + assert(tPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(tPath)) + checkAnswer(spark.table("t"), Row("1") :: Nil) + + val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1") + spark.sql("create table t1 using parquet as select 2 as a") + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}" + + assert(table1.location.stripSuffix("/") == expectedPath1) + assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path)) + checkAnswer(spark.table("t1"), Row(2) :: Nil) + } + } // TODO: add more tests. } } -- GitLab