From 2e62560024999c215cf2373fc9a8070bb2ad5c58 Mon Sep 17 00:00:00 2001 From: Wenchen Fan <wenchen@databricks.com> Date: Thu, 19 Jan 2017 00:07:48 -0800 Subject: [PATCH] [SPARK-19265][SQL] make table relation cache general and does not depend on hive ## What changes were proposed in this pull request? We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a lot of things: file status, resolved data source, inferred schema, etc. However, it doesn't make sense to limit this cache with hive support, we should move it to SQL core module so that users can use this cache without hive support. It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to remove it eventually. main changes: 1. move the table relation cache to `SessionCatalog` 2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore 3. `FindDataSourceTable` will read/write the table relation cache. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #16621 from cloud-fan/plan-cache. --- .../sql/catalyst/catalog/SessionCatalog.scala | 33 +++++-- .../spark/sql/catalyst/identifiers.scala | 4 +- .../catalog/SessionCatalogSuite.scala | 6 +- .../apache/spark/sql/DataFrameWriter.scala | 4 +- .../command/AnalyzeColumnCommand.scala | 3 +- .../command/AnalyzeTableCommand.scala | 3 +- .../spark/sql/execution/command/tables.scala | 2 +- .../datasources/DataSourceStrategy.scala | 60 +++++++----- .../spark/sql/internal/CatalogImpl.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 11 --- .../sql/execution/command/DDLSuite.scala | 3 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 98 ++----------------- .../spark/sql/hive/HiveSessionCatalog.scala | 37 +------ .../spark/sql/hive/HiveSessionState.scala | 2 + .../spark/sql/hive/HiveStrategies.scala | 17 +++- .../CreateHiveTableAsSelectCommand.scala | 8 +- .../apache/spark/sql/hive/test/TestHive.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 22 +++++ .../spark/sql/hive/StatisticsSuite.scala | 8 +- .../sql/hive/execution/SQLQuerySuite.scala | 17 ++-- 20 files changed, 144 insertions(+), 198 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8008fcd639..e9543f7987 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable +import com.google.common.cache.{Cache, CacheBuilder} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} @@ -117,6 +117,14 @@ class SessionCatalog( if (conf.caseSensitiveAnalysis) name else name.toLowerCase } + /** + * A cache of qualified table name to table relation plan. + */ + val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { + // TODO: create a config instead of hardcode 1000 here. + CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]() + } + /** * This method is used to make the given path qualified before we * store this path in the underlying external catalog. So, when a path @@ -573,7 +581,7 @@ class SessionCatalog( val relationAlias = alias.getOrElse(table) if (db == globalTempViewManager.database) { globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name)) + SubqueryAlias(relationAlias, viewDef, None) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) @@ -586,12 +594,12 @@ class SessionCatalog( desc = metadata, output = metadata.schema.toAttributes, child = parser.parsePlan(viewText)) - SubqueryAlias(relationAlias, child, Option(name)) + SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db)))) } else { SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None) } } else { - SubqueryAlias(relationAlias, tempTables(table), Option(name)) + SubqueryAlias(relationAlias, tempTables(table), None) } } } @@ -651,14 +659,21 @@ class SessionCatalog( * Refresh the cache entry for a metastore table, if any. */ def refreshTable(name: TableIdentifier): Unit = synchronized { + val dbName = formatDatabaseName(name.database.getOrElse(currentDb)) + val tableName = formatTableName(name.table) + // Go through temporary tables and invalidate them. - // If the database is defined, this is definitely not a temp table. + // If the database is defined, this may be a global temporary view. // If the database is not defined, there is a good chance this is a temp table. if (name.database.isEmpty) { - tempTables.get(formatTableName(name.table)).foreach(_.refresh()) - } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { - globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh()) + tempTables.get(tableName).foreach(_.refresh()) + } else if (dbName == globalTempViewManager.database) { + globalTempViewManager.get(tableName).foreach(_.refresh()) } + + // Also invalidate the table relation cache. + val qualifiedTableName = QualifiedTableName(dbName, tableName) + tableRelationCache.invalidate(qualifiedTableName) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index 834897b850..26697e9867 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: Option[String]) override val identifier: String = table def this(table: String) = this(table, None) - } +/** A fully qualified identifier for a table (i.e., database.tableName) */ +case class QualifiedTableName(database: String, name: String) + object TableIdentifier { def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 7a7de25acb..f935de68af 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest { == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None)) // Otherwise, we'll first look up a temporary table with the same name assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))) + == SubqueryAlias("tbl1", tempTable1, None)) // Then, if that does not exist, look up the relation in the current database sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) @@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest { val tmpView = Range(1, 10, 2, 10) catalog.createTempView("vw1", tmpView, overrideIfExists = false) val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range")) - assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))) + assert(plan == SubqueryAlias("range", tmpView, None)) } test("look up view relation") { @@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest { // Look up a view using current database of the session catalog. sessionCatalog.setCurrentDatabase("db3") comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")), - SubqueryAlias("view1", view, Some(TableIdentifier("view1")))) + SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3"))))) } test("table exists") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7fc03bd5ef..ff1f0177e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) => relation.catalogTable.identifier } - EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match { + + val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed + EliminateSubqueryAliases(tableRelation) match { // check if the table is a data source table (the relation is a BaseRelation). case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 1340c9bece..d024a3673d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -40,7 +40,8 @@ case class AnalyzeColumnCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + val relation = + EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) // Compute total size val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 4a994e34af..30b6cc7617 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -41,7 +41,8 @@ case class AnalyzeTableCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) + val relation = + EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) relation match { case relation: CatalogRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 246894813c..1b596c97a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -450,7 +450,7 @@ case class DescribeTableCommand( if (metadata.schema.isEmpty) { // In older version(prior to 2.1) of Spark, the table schema can be empty and should be // inferred at runtime. We should still support it. - describeSchema(catalog.lookupRelation(metadata.identifier).schema, result) + describeSchema(sparkSession.table(metadata.identifier).schema, result) } else { describeSchema(metadata.schema, result) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3d3db06eee..21b07ee85a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,18 +17,17 @@ package org.apache.spark.sql.execution.datasources -import scala.collection.mutable.ArrayBuffer +import java.util.concurrent.Callable -import org.apache.hadoop.fs.Path +import scala.collection.mutable.ArrayBuffer import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation} -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { /** - * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data - * source information. + * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable( - sparkSession: SparkSession, - simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = { - val table = simpleCatalogRelation.catalogTable - val pathOption = table.storage.locationUri.map("path" -> _) - val dataSource = - DataSource( - sparkSession, - userSpecifiedSchema = Some(table.schema), - partitionColumns = table.partitionColumnNames, - bucketSpec = table.bucketSpec, - className = table.provider.get, - options = table.storage.properties ++ pathOption) - - LogicalRelation( - dataSource.resolveRelation(), - expectedOutputAttributes = Some(simpleCatalogRelation.output), - catalogTable = Some(table)) + private def readDataSourceTable(table: CatalogTable): LogicalPlan = { + val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) + val cache = sparkSession.sessionState.catalog.tableRelationCache + val withHiveSupport = + sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" + + cache.get(qualifiedTableName, new Callable[LogicalPlan]() { + override def call(): LogicalPlan = { + val pathOption = table.storage.locationUri.map("path" -> _) + val dataSource = + DataSource( + sparkSession, + // In older version(prior to 2.1) of Spark, the table schema can be empty and should be + // inferred at runtime. We should still support it. + userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), + partitionColumns = table.partitionColumnNames, + bucketSpec = table.bucketSpec, + className = table.provider.get, + options = table.storage.properties ++ pathOption, + // TODO: improve `InMemoryCatalog` and remove this limitation. + catalogTable = if (withHiveSupport) Some(table) else None) + + LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) + } + }) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(sparkSession, s)) + i.copy(table = readDataSourceTable(s.metadata)) case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(sparkSession, s) + readDataSourceTable(s.metadata) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 9136a83bc2..3d9f41832b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { // If this table is cached as an InMemoryRelation, drop the original // cached version and make the new version cached lazily. - val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent) + val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed // Use lookupCachedData directly since RefreshTable also takes databaseName. val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty if (isCached) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f4df80fd9c..621a46adf4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(d.size == d.distinct.size) } - test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") { - val tableName = "tbl" - withTable(tableName) { - spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName) - val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)) - val expr = relation.resolve("i") - val qe = spark.sessionState.executePlan(Project(Seq(expr), relation)) - qe.assertAnalyzed() - } - } - private def verifyNullabilityInFilterExec( df: DataFrame, expr: String, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 97990a6d9b..b4c9e276ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1790,7 +1790,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("SET LOCATION for managed table") { - withTable("src") { + withTable("tbl") { withTempDir { dir => sql("CREATE TABLE tbl(i INT) USING parquet") sql("INSERT INTO tbl SELECT 1") @@ -1799,6 +1799,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'") + spark.catalog.refreshTable("tbl") // SET LOCATION won't move data from previous table path to new table path. assert(spark.table("tbl").count() == 0) // the previous table path should be still there. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e4b1f6ae3e..faa76b73fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,17 +17,15 @@ package org.apache.spark.sql.hive -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat @@ -41,9 +39,7 @@ import org.apache.spark.sql.types._ */ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - - /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class QualifiedTableName(database: String, name: String) + private lazy val tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase @@ -65,45 +61,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - /** A cache of Spark SQL data source tables that have been accessed. */ - protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { - val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { - override def load(in: QualifiedTableName): LogicalPlan = { - logDebug(s"Creating new cached data source for $in") - val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name) - - val pathOption = table.storage.locationUri.map("path" -> _) - val dataSource = - DataSource( - sparkSession, - // In older version(prior to 2.1) of Spark, the table schema can be empty and should be - // inferred at runtime. We should still support it. - userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema), - partitionColumns = table.partitionColumnNames, - bucketSpec = table.bucketSpec, - className = table.provider.get, - options = table.storage.properties ++ pathOption, - catalogTable = Some(table)) - - LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table)) - } - } - - CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) - } - - def refreshTable(tableIdent: TableIdentifier): Unit = { - // refreshTable does not eagerly reload the cache. It just invalidate the cache. - // Next time when we use the table, it will be populated in the cache. - // Since we also cache ParquetRelations converted from Hive Parquet tables and - // adding converted ParquetRelations into the cache is not defined in the load function - // of the cache (instead, we add the cache entry in convertToParquetRelation), - // it is better at here to invalidate the cache to avoid confusing waring logs from the - // cache loader (e.g. cannot find data source provider, which is only defined for - // data source table.). - cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) - } - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) @@ -111,45 +68,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log new Path(new Path(dbLocation), tblName).toString } - /** - * Returns a [[LogicalPlan]] that represents the given table or view from Hive metastore. - * - * @param tableIdent The name of the table/view that we look up. - * @param alias The alias name of the table/view that we look up. - * @return a [[LogicalPlan]] that represents the given table or view from Hive metastore. - */ - def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String]): LogicalPlan = { - val qualifiedTableName = getQualifiedTableName(tableIdent) - val table = sparkSession.sharedState.externalCatalog.getTable( - qualifiedTableName.database, qualifiedTableName.name) - - if (DDLUtils.isDatasourceTable(table)) { - val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None) - // Then, if alias is specified, wrap the table with a Subquery using the alias. - // Otherwise, wrap the table with a Subquery using the table name. - alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) - } else if (table.tableType == CatalogTableType.VIEW) { - val tableIdentifier = table.identifier - val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) - // The relation is a view, so we wrap the relation by: - // 1. Add a [[View]] operator over the relation to keep track of the view desc; - // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. - val child = View( - desc = table, - output = table.schema.toAttributes, - child = sparkSession.sessionState.sqlParser.parsePlan(viewText)) - SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, Option(tableIdentifier)) - } else { - val qualifiedTable = - MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession) - alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable) - } - } - private def getCached( tableIdentifier: QualifiedTableName, pathsInMetastore: Seq[Path], @@ -159,7 +77,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log expectedBucketSpec: Option[BucketSpec], partitionSchema: Option[StructType]): Option[LogicalRelation] = { - cachedDataSourceTables.getIfPresent(tableIdentifier) match { + tableRelationCache.getIfPresent(tableIdentifier) match { case null => None // Cache miss case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => val cachedRelationFileFormatClass = relation.fileFormat.getClass @@ -178,7 +96,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log Some(logical) } else { // If the cached relation is not updated, we invalidate it right away. - cachedDataSourceTables.invalidate(tableIdentifier) + tableRelationCache.invalidate(tableIdentifier) None } case _ => @@ -187,7 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log s"should be stored as $expectedFileFormat. However, we are getting " + s"a ${relation.fileFormat} from the metastore cache. This cached " + s"entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) + tableRelationCache.invalidate(tableIdentifier) None } case other => @@ -195,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + s"This cached entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) + tableRelationCache.invalidate(tableIdentifier) None } } @@ -270,7 +188,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val created = LogicalRelation(relation, catalogTable = Some(metastoreRelation.catalogTable)) - cachedDataSourceTables.put(tableIdentifier, created) + tableRelationCache.put(tableIdentifier, created) created } @@ -298,7 +216,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log className = fileType).resolveRelation(), catalogTable = Some(metastoreRelation.catalogTable)) - cachedDataSourceTables.put(tableIdentifier, created) + tableRelationCache.put(tableIdentifier, created) created } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index b3cbbedbe1..44ef5cce2e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, Gener import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchTableException} +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf @@ -58,28 +58,6 @@ private[sql] class HiveSessionCatalog( hadoopConf, parser) { - override def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { - synchronized { - val table = formatTableName(name.table) - val db = formatDatabaseName(name.database.getOrElse(currentDb)) - if (db == globalTempViewManager.database) { - val relationAlias = alias.getOrElse(table) - globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name)) - }.getOrElse(throw new NoSuchTableException(db, table)) - } else if (name.database.isDefined || !tempTables.contains(table)) { - val newName = name.copy(database = Some(db), table = table) - metastoreCatalog.lookupRelation(newName, alias) - } else { - val relation = tempTables(table) - val tableWithQualifiers = SubqueryAlias(table, relation, None) - // If an alias was specified by the lookup, wrap the plan in a subquery so that - // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers) - } - } - } - // ---------------------------------------------------------------- // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- @@ -93,15 +71,6 @@ private[sql] class HiveSessionCatalog( val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions - override def refreshTable(name: TableIdentifier): Unit = { - super.refreshTable(name) - metastoreCatalog.refreshTable(name) - } - - def invalidateCache(): Unit = { - metastoreCatalog.cachedDataSourceTables.invalidateAll() - } - def hiveDefaultTableFilePath(name: TableIdentifier): String = { metastoreCatalog.hiveDefaultTableFilePath(name) } @@ -109,7 +78,7 @@ private[sql] class HiveSessionCatalog( // For testing only private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = { val key = metastoreCatalog.getQualifiedTableName(table) - metastoreCatalog.cachedDataSourceTables.getIfPresent(key) + sparkSession.sessionState.catalog.tableRelationCache.getIfPresent(key) } override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 4e30d038b1..d3cef6e0cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -67,6 +67,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) DataSourceAnalysis(conf) :: new DetermineHiveSerde(conf) :: new HiveAnalysis(sparkSession) :: + new FindDataSourceTable(sparkSession) :: + new FindHiveSerdeTable(sparkSession) :: new ResolveDataSource(sparkSession) :: Nil override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 7987a0a84c..b649612a40 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} @@ -127,6 +127,21 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] { } } +/** + * Replaces [[SimpleCatalogRelation]] with [[MetastoreRelation]] if its table provider is hive. + */ +class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) + if DDLUtils.isHiveTable(s.metadata) => + i.copy(table = + MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)) + + case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) => + MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session) + } +} + private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index ef5a5a001f..ccc2d64c4a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.MetastoreRelation @@ -73,7 +73,9 @@ case class CreateHiveTableAsSelectCommand( // Get the Metastore Relation sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { - case r: MetastoreRelation => r + case SubqueryAlias(_, r: SimpleCatalogRelation, _) => + val tableMeta = r.metadata + MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession) } } // TODO ideally, we should get the output data ready first and then diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index dcb8e498a4..3267c237c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -431,7 +431,7 @@ private[hive] class TestHiveSparkSession( sharedState.cacheManager.clearCache() loadedTables.clear() sessionState.catalog.clearTempTables() - sessionState.catalog.invalidateCache() + sessionState.catalog.tableRelationCache.invalidateAll() sessionState.metadataHive.reset() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 081f6f6d82..f0e2c9369b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1322,4 +1322,26 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue) } } + + test("SPARK-18464: support old table which doesn't store schema in table properties") { + withTable("old") { + withTempPath { path => + Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath) + val tableDesc = CatalogTable( + identifier = TableIdentifier("old", Some("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat.empty.copy( + properties = Map("path" -> path.getAbsolutePath) + ), + schema = new StructType(), + properties = Map( + HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet")) + hiveClient.createTable(tableDesc, ignoreIfExists = false) + + checkAnswer(spark.table("old"), Row(1, "a")) + + checkAnswer(sql("DESC old"), Row("i", "int", null) :: Row("j", "string", null) :: Nil) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 0053aa1642..e2fcd2fd41 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -62,7 +62,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true) - val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table")) + val relation = spark.table("csv_table").queryExecution.analyzed.children.head .asInstanceOf[MetastoreRelation] val properties = relation.hiveQlTable.getParameters @@ -80,7 +80,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).stats(conf).sizeInBytes + spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -451,7 +451,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") } // Table lookup will make the table cached. - catalog.lookupRelation(tableIndent) + spark.table(tableIndent) statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent) .asInstanceOf[LogicalRelation].catalogTable.get.stats.get @@ -461,7 +461,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } else { sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS") } - catalog.lookupRelation(tableIndent) + spark.table(tableIndent) statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent) .asInstanceOf[LogicalRelation].catalogTable.get.stats.get } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 953e29127f..104b5250b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation} @@ -513,8 +514,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { isDataSourceTable: Boolean, format: String, userSpecifiedLocation: Option[String] = None): Unit = { - val relation = EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(TableIdentifier(tableName))) + var relation: LogicalPlan = null + withSQLConf( + HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", + HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { + relation = EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed) + } val catalogTable = sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) relation match { @@ -1021,13 +1026,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { // generates an invalid query plan. val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) read.json(rdd).createOrReplaceTempView("data") - val originalConf = sessionState.conf.convertCTAS - setConf(SQLConf.CONVERT_CTAS, false) - try { + withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { sql("CREATE TABLE explodeTest (key bigInt)") table("explodeTest").queryExecution.analyzed match { - case metastoreRelation: MetastoreRelation => // OK + case SubqueryAlias(_, r: MetastoreRelation, _) => // OK case _ => fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation") } @@ -1040,8 +1043,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("DROP TABLE explodeTest") dropTempTable("data") - } finally { - setConf(SQLConf.CONVERT_CTAS, originalConf) } } -- GitLab