From 14bdb25fd76cf1da8f590231833f47b5b7059f11 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Tue, 15 Aug 2017 09:04:56 -0700 Subject: [PATCH] [SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/15900 , to fix one more bug: When table schema is empty and need to be inferred at runtime, we should not resolve parent plans before the schema has been inferred, or the parent plans will be resolved against an empty schema and may get wrong result for something like `select *` The fix logic is: introduce `UnresolvedCatalogRelation` as a placeholder. Then we replace it with `LogicalRelation` or `HiveTableRelation` during analysis, so that it's guaranteed that we won't resolve parent plans until the schema has been inferred. ## How was this patch tested? regression test Author: Wenchen Fan <wenchen@databricks.com> Closes #18907 from cloud-fan/bug. --- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +-- .../sql/catalyst/catalog/interface.scala | 25 ++++++---- .../catalog/SessionCatalogSuite.scala | 5 +- .../apache/spark/sql/DataFrameWriter.scala | 11 ++--- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../execution/OptimizeMetadataOnlyQuery.scala | 6 +-- .../datasources/DataSourceStrategy.scala | 49 +++++++++++-------- .../sql/execution/datasources/rules.scala | 4 +- .../sql/StatisticsCollectionTestBase.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 +-- .../spark/sql/hive/HiveStrategies.scala | 17 +++---- .../hive/execution/HiveTableScanExec.scala | 6 +-- .../sql/hive/MetastoreDataSourcesSuite.scala | 1 + .../spark/sql/hive/StatisticsSuite.scala | 10 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 10 ++-- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- 17 files changed, 90 insertions(+), 83 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e3237a8846..6030d90ed9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -678,12 +678,7 @@ class SessionCatalog( child = parser.parsePlan(viewText)) SubqueryAlias(table, child) } else { - val tableRelation = CatalogRelation( - metadata, - // we assume all the columns are nullable. - metadata.dataSchema.asNullable.toAttributes, - metadata.partitionSchema.asNullable.toAttributes) - SubqueryAlias(table, tableRelation) + SubqueryAlias(table, UnresolvedCatalogRelation(metadata)) } } else { SubqueryAlias(table, tempTables(table)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f86510624a..5a8c4e7610 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} -import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -405,11 +404,22 @@ object CatalogTypes { type TablePartitionSpec = Map[String, String] } +/** + * A placeholder for a table relation, which will be replaced by concrete relation like + * `LogicalRelation` or `HiveTableRelation`, during analysis. + */ +case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode { + assert(tableMeta.identifier.database.isDefined) + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = Nil +} /** - * A [[LogicalPlan]] that represents a table. + * A `LogicalPlan` that represents a hive table. + * + * TODO: remove this after we completely make hive as a data source. */ -case class CatalogRelation( +case class HiveTableRelation( tableMeta: CatalogTable, dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation { @@ -423,7 +433,7 @@ case class CatalogRelation( def isPartitioned: Boolean = partitionCols.nonEmpty override def equals(relation: Any): Boolean = relation match { - case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output + case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output case _ => false } @@ -431,7 +441,7 @@ case class CatalogRelation( Objects.hashCode(tableMeta.identifier, output) } - override lazy val canonicalized: LogicalPlan = copy( + override lazy val canonicalized: HiveTableRelation = copy( tableMeta = tableMeta.copy( storage = CatalogStorageFormat.empty, createTime = -1 @@ -444,15 +454,12 @@ case class CatalogRelation( }) override def computeStats(): Statistics = { - // For data source tables, we will create a `LogicalRelation` and won't call this method, for - // hive serde tables, we will always generate a statistics. - // TODO: unify the table stats generation. tableMeta.stats.map(_.toPlanStats(output)).getOrElse { throw new IllegalStateException("table stats must be specified.") } } - override def newInstance(): LogicalPlan = copy( + override def newInstance(): HiveTableRelation = copy( dataCols = dataCols.map(_.newInstance()), partitionCols = partitionCols.map(_.newInstance())) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index d2b670d06d..1cce19989c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -518,14 +517,14 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name assert(catalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1)) // Then, if that does not exist, look up the relation in the current database catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head - .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) + .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 65c9ef4077..877051a60e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -24,11 +24,11 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) => - relation.tableMeta.identifier + case relation: HiveTableRelation => relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed @@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) - && srcRelations.contains(relation.tableMeta.identifier) => + case relation: HiveTableRelation + if srcRelations.contains(relation.tableMeta.identifier) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") case _ => // OK diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index aa968d8b3c..a9887eb952 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} @@ -2965,7 +2965,7 @@ class Dataset[T] private[sql]( fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles - case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) => + case r: HiveTableRelation => r.tableMeta.storage.locationUri.map(_.toString).toArray }.flatten files.toSet.toArray diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 02f45abaa3..301c4f0264 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic val partitionData = fsRelation.location.listFiles(Nil, Nil) LocalRelation(partAttrs, partitionData.map(_.values)) - case relation: CatalogRelation => + case relation: HiveTableRelation => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) val caseInsensitiveProperties = CaseInsensitiveMap(relation.tableMeta.storage.properties) @@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) Some((AttributeSet(partAttrs), l)) - case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) Some((AttributeSet(partAttrs), relation)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 587b9b450e..2370177427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,12 +25,12 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ @@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast /** - * Replaces [[CatalogRelation]] with data source table if its table provider is not hive. + * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans. + * + * TODO: we should remove the special handling for hive tables after completely making hive as a + * data source. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(r: CatalogRelation): LogicalPlan = { - val table = r.tableMeta + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) - val catalogProxy = sparkSession.sessionState.catalog - - val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { + val catalog = sparkSession.sessionState.catalog + catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) val dataSource = @@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table) } - }).asInstanceOf[LogicalRelation] + }) + } - if (r.output.isEmpty) { - // It's possible that the table schema is empty and need to be inferred at runtime. For this - // case, we don't need to change the output of the cached plan. - plan - } else { - plan.copy(output = r.output) - } + private def readHiveTable(table: CatalogTable): LogicalPlan = { + HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _) - if DDLUtils.isDatasourceTable(r.tableMeta) => - i.copy(table = readDataSourceTable(r)) + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) + if DDLUtils.isDatasourceTable(tableMeta) => + i.copy(table = readDataSourceTable(tableMeta)) + + case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => + i.copy(table = readHiveTable(tableMeta)) + + case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => + readDataSourceTable(tableMeta) - case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) => - readDataSourceTable(r) + case UnresolvedCatalogRelation(tableMeta) => + readHiveTable(tableMeta) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index cb8dc1e041..84acca242a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { - case relation: CatalogRelation => + case relation: HiveTableRelation => val metadata = relation.tableMeta preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => @@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) { private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = { operator match { - case _: CatalogRelation => 1 + case _: HiveTableRelation => 1 case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1 case _: LeafNode => 0 // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 41569762d3..5916cd76b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Analyze only one column. sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { - case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta) + case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta) case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) }.head val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 808dc013f1..8bab059ed5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } def convertToLogicalRelation( - relation: CatalogRelation, + relation: HiveTableRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { @@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - // The inferred schema may have different filed names as the table schema, we should respect + // The inferred schema may have different field names as the table schema, we should respect // it, but also respect the exprId in table relation output. assert(result.output.length == relation.output.length && result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) @@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def inferIfNeeded( - relation: CatalogRelation, + relation: HiveTableRelation, options: Map[String, String], fileFormat: FileFormat, fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9c60d22d35..ae1e7e72e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,10 +21,9 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} @@ -116,7 +115,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case relation: CatalogRelation + case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { @@ -147,7 +146,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists) + case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists) if DDLUtils.isHiveTable(r.tableMeta) => InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists) @@ -171,13 +170,13 @@ object HiveAnalysis extends Rule[LogicalPlan] { case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { - private def isConvertible(relation: CatalogRelation): Boolean = { + private def isConvertible(relation: HiveTableRelation): Boolean = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } - private def convert(relation: CatalogRelation): LogicalRelation = { + private def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) if (serde.contains("parquet")) { val options = Map(ParquetOptions.MERGE_SCHEMA -> @@ -194,14 +193,14 @@ case class RelationConversions( override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists) + case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists) // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) // Read path - case relation: CatalogRelation + case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => convert(relation) } @@ -229,7 +228,7 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: CatalogRelation) => + case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionCols) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index e191071efb..896f24f2e2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution._ @@ -50,7 +50,7 @@ import org.apache.spark.util.Utils private[hive] case class HiveTableScanExec( requestedAttributes: Seq[Attribute], - relation: CatalogRelation, + relation: HiveTableRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) extends LeafExecNode { @@ -205,7 +205,7 @@ case class HiveTableScanExec( val input: AttributeSeq = relation.output HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), - relation.canonicalized.asInstanceOf[CatalogRelation], + relation.canonicalized, QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c785aca985..e01198dd53 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1372,6 +1372,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv hiveClient.createTable(tableDesc, ignoreIfExists = false) checkAnswer(spark.table("old"), Row(1, "a")) + checkAnswer(sql("select * from old"), Row(1, "a")) val expectedSchema = StructType(Seq( StructField("i", IntegerType, nullable = true), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 36566bffb9..71cf79c473 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.command.DDLUtils @@ -72,7 +72,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto |LOCATION '${tempDir.toURI}'""".stripMargin) val relation = spark.table("csv_table").queryExecution.analyzed.children.head - .asInstanceOf[CatalogRelation] + .asInstanceOf[HiveTableRelation] val properties = relation.tableMeta.ignoredProperties assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0") @@ -905,7 +905,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("estimates the size of a test Hive serde tables") { val df = sql("""SELECT * FROM src""") val sizes = df.queryExecution.analyzed.collect { - case relation: CatalogRelation => relation.stats.sizeInBytes + case relation: HiveTableRelation => relation.stats.sizeInBytes } assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}") assert(sizes(0).equals(BigInt(5812)), @@ -965,7 +965,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto () => (), metastoreQuery, metastoreAnswer, - implicitly[ClassTag[CatalogRelation]] + implicitly[ClassTag[HiveTableRelation]] ) } @@ -979,7 +979,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto // Assert src has a size smaller than the threshold. val sizes = df.queryExecution.analyzed.collect { - case relation: CatalogRelation => relation.stats.sizeInBytes + case relation: HiveTableRelation => relation.stats.sizeInBytes } assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 45bbb0c674..ef3d9b27aa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} @@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { case LogicalRelation(r: HadoopFsRelation, _, _) => if (!isDataSourceTable) { fail( - s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " + + s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " + s"${HadoopFsRelation.getClass.getCanonicalName}.") } userSpecifiedLocation match { @@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } assert(catalogTable.provider.get === format) - case r: CatalogRelation => + case r: HiveTableRelation => if (isDataSourceTable) { fail( s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + - s"${classOf[CatalogRelation].getCanonicalName}.") + s"${classOf[HiveTableRelation].getCanonicalName}.") } userSpecifiedLocation match { case Some(location) => @@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case SubqueryAlias(_, r: CatalogRelation) => // OK + case SubqueryAlias(_, r: HiveTableRelation) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 8c855730c3..60ccd996d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHive._ @@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } else { queryExecution.analyzed.collectFirst { - case _: CatalogRelation => () + case _: HiveTableRelation => () }.getOrElse { fail(s"Expecting no conversion from orc to data sources, " + s"but got:\n$queryExecution") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 23f21e6b99..303884da19 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogRelation +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ @@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest { } } else { queryExecution.analyzed.collectFirst { - case _: CatalogRelation => + case _: HiveTableRelation => }.getOrElse { fail(s"Expecting no conversion from parquet to data sources, " + s"but got:\n$queryExecution") -- GitLab