From b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Thu, 22 Sep 2016 12:52:09 +0800
Subject: [PATCH] [SPARK-17609][SQL] SessionCatalog.tableExists should not
 check temp view

## What changes were proposed in this pull request?

After #15054 , there is no place in Spark SQL that need `SessionCatalog.tableExists` to check temp views, so this PR makes `SessionCatalog.tableExists` only check permanent table/view and removes some hacks.

This PR also improves the `getTempViewOrPermanentTableMetadata` that is introduced in  #15054 , to make the code simpler.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #15160 from cloud-fan/exists.
---
 .../sql/catalyst/catalog/SessionCatalog.scala | 70 +++++++++----------
 .../catalog/SessionCatalogSuite.scala         | 30 ++++----
 .../apache/spark/sql/DataFrameWriter.scala    |  9 +--
 .../command/createDataSourceTables.scala      | 15 ++--
 .../spark/sql/execution/command/ddl.scala     | 43 +++++-------
 .../spark/sql/execution/command/tables.scala  | 17 +----
 .../spark/sql/internal/CatalogImpl.scala      |  6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala  |  2 +-
 .../sql/hive/execution/HiveDDLSuite.scala     |  4 +-
 9 files changed, 81 insertions(+), 115 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ef29c75c01..8c01c7a3f2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -245,6 +245,16 @@ class SessionCatalog(
     externalCatalog.alterTable(newTableDefinition)
   }
 
+  /**
+   * Return whether a table/view with the specified name exists. If no database is specified, check
+   * with current database.
+   */
+  def tableExists(name: TableIdentifier): Boolean = synchronized {
+    val db = formatDatabaseName(name.database.getOrElse(currentDb))
+    val table = formatTableName(name.table)
+    externalCatalog.tableExists(db, table)
+  }
+
   /**
    * Retrieve the metadata of an existing permanent table/view. If no database is specified,
    * assume the table/view is in the current database. If the specified table/view is not found
@@ -270,24 +280,6 @@ class SessionCatalog(
     externalCatalog.getTableOption(db, table)
   }
 
-  /**
-   * Retrieve the metadata of an existing temporary view or permanent table/view.
-   * If the temporary view does not exist, tries to get the metadata an existing permanent
-   * table/view. If no database is specified, assume the table/view is in the current database.
-   * If the specified table/view is not found in the database then a [[NoSuchTableException]] is
-   * thrown.
-   */
-  def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
-    val table = formatTableName(name)
-    getTempView(table).map { plan =>
-      CatalogTable(
-        identifier = TableIdentifier(table),
-        tableType = CatalogTableType.VIEW,
-        storage = CatalogStorageFormat.empty,
-        schema = plan.output.toStructType)
-    }.getOrElse(getTableMetadata(TableIdentifier(name)))
-  }
-
   /**
    * Load files stored in given path into an existing metastore table.
    * If no database is specified, assume the table is in the current database.
@@ -368,6 +360,30 @@ class SessionCatalog(
   // | Methods that interact with temporary and metastore tables |
   // -------------------------------------------------------------
 
+  /**
+   * Retrieve the metadata of an existing temporary view or permanent table/view.
+   *
+   * If a database is specified in `name`, this will return the metadata of table/view in that
+   * database.
+   * If no database is specified, this will first attempt to get the metadata of a temporary view
+   * with the same name, then, if that does not exist, return the metadata of table/view in the
+   * current database.
+   */
+  def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
+    val table = formatTableName(name.table)
+    if (name.database.isDefined) {
+      getTableMetadata(name)
+    } else {
+      getTempView(table).map { plan =>
+        CatalogTable(
+          identifier = TableIdentifier(table),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = plan.output.toStructType)
+      }.getOrElse(getTableMetadata(name))
+    }
+  }
+
   /**
    * Rename a table.
    *
@@ -449,24 +465,6 @@ class SessionCatalog(
     }
   }
 
-  /**
-   * Return whether a table/view with the specified name exists.
-   *
-   * Note: If a database is explicitly specified, then this will return whether the table/view
-   * exists in that particular database instead. In that case, even if there is a temporary
-   * table with the same name, we will return false if the specified database does not
-   * contain the table/view.
-   */
-  def tableExists(name: TableIdentifier): Boolean = synchronized {
-    val db = formatDatabaseName(name.database.getOrElse(currentDb))
-    val table = formatTableName(name.table)
-    if (isTemporaryTable(name)) {
-      true
-    } else {
-      externalCatalog.tableExists(db, table)
-    }
-  }
-
   /**
    * Return whether a table with the specified name is a temporary table.
    *
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 384a730861..915ed8f8b1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -425,35 +425,37 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
     // If database is explicitly specified, do not check temporary tables
     val tempTable = Range(1, 10, 1, 10)
-    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
     assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
     // If database is not explicitly specified, check the current database
     catalog.setCurrentDatabase("db2")
     assert(catalog.tableExists(TableIdentifier("tbl1")))
     assert(catalog.tableExists(TableIdentifier("tbl2")))
-    assert(catalog.tableExists(TableIdentifier("tbl3")))
-  }
 
-  test("tableExists on temporary views") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10)
-    assert(!catalog.tableExists(TableIdentifier("view1")))
-    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
-    catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.tableExists(TableIdentifier("view1")))
-    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
+    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
+    // tableExists should not check temp view.
+    assert(!catalog.tableExists(TableIdentifier("tbl3")))
   }
 
   test("getTempViewOrPermanentTableMetadata on temporary views") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val tempTable = Range(1, 10, 2, 10)
     intercept[NoSuchTableException] {
-      catalog.getTempViewOrPermanentTableMetadata("view1")
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
+    }.getMessage
+
+    intercept[NoSuchTableException] {
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
     }.getMessage
 
     catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
-      TableIdentifier("view1"), "the temporary view `view1` should exist")
+    assert(catalog.getTempViewOrPermanentTableMetadata(
+      TableIdentifier("view1")).identifier.table == "view1")
+    assert(catalog.getTempViewOrPermanentTableMetadata(
+      TableIdentifier("view1")).schema(0).name == "id")
+
+    intercept[NoSuchTableException] {
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
+    }.getMessage
   }
 
   test("list tables without pattern") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9e343b5d24..64d3422cb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
     }
 
-    val sessionState = df.sparkSession.sessionState
-    val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = tableIdent.copy(database = Some(db))
-    // Pass a table identifier with database part, so that `tableExists` won't check temp views
-    // unexpectedly.
-    val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
+    val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
@@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           bucketSpec = getBucketSpec
         )
         val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
-        sessionState.executePlan(cmd).toRdd
+        df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d8e20b09c1..a04a13e698 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
     assert(table.provider.isDefined)
 
     val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    // Pass a table identifier with database part, so that `tableExists` won't check temp views
-    // unexpectedly.
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+    if (sessionState.catalog.tableExists(table.identifier)) {
       if (ignoreIfExists) {
         return Seq.empty[Row]
       } else {
-        throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
       }
     }
 
@@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(
 
     var createMetastoreTable = false
     var existingSchema = Option.empty[StructType]
-    // Pass a table identifier with database part, so that `tableExists` won't check temp views
-    // unexpectedly.
     if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
       // Check if we need to throw an exception or just return.
       mode match {
@@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
           // TODO: Check that options from the resolved relation match the relation that we are
           // inserting into (i.e. using the same compression).
 
-          EliminateSubqueryAliases(
-            sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+          // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+          // views unexpectedly.
+          EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               // check if the file formats match
               l.relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b57b2d280d..01ac89868d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,32 +183,25 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    if (!catalog.tableExists(tableName)) {
-      if (!ifExists) {
-        val objectName = if (isView) "View" else "Table"
-        throw new AnalysisException(s"$objectName to drop '$tableName' does not exist")
-      }
-    } else {
-      // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
-      // issue an exception.
-      catalog.getTableMetadataOption(tableName).map(_.tableType match {
-        case CatalogTableType.VIEW if !isView =>
-          throw new AnalysisException(
-            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
-        case o if o != CatalogTableType.VIEW && isView =>
-          throw new AnalysisException(
-            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
-        case _ =>
-      })
-      try {
-        sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession.table(tableName.quotedString))
-      } catch {
-        case NonFatal(e) => log.warn(e.toString, e)
-      }
-      catalog.refreshTable(tableName)
-      catalog.dropTable(tableName, ifExists, purge)
+    // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+    // issue an exception.
+    catalog.getTableMetadataOption(tableName).map(_.tableType match {
+      case CatalogTableType.VIEW if !isView =>
+        throw new AnalysisException(
+          "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+      case o if o != CatalogTableType.VIEW && isView =>
+        throw new AnalysisException(
+          s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+      case _ =>
+    })
+    try {
+      sparkSession.sharedState.cacheManager.uncacheQuery(
+        sparkSession.table(tableName.quotedString))
+    } catch {
+      case NonFatal(e) => log.warn(e.toString, e)
     }
+    catalog.refreshTable(tableName)
+    catalog.dropTable(tableName, ifExists, purge)
     Seq.empty[Row]
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 94b46c5d97..0f61629317 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -59,16 +59,7 @@ case class CreateTableLikeCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    if (!catalog.tableExists(sourceTable)) {
-      throw new AnalysisException(
-        s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
-    }
-
-    val sourceTableDesc = if (sourceTable.database.isDefined) {
-      catalog.getTableMetadata(sourceTable)
-    } else {
-      catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
-    }
+    val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
 
     // Storage format
     val newStorage =
@@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    val table = if (tableName.database.isDefined) {
-      catalog.getTableMetadata(tableName)
-    } else {
-      catalog.getTempViewOrPermanentTableMetadata(tableName.table)
-    }
+    val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
     table.schema.map { c =>
       Row(c.name)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6fecda232a..f252535765 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   }
 
   private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
-    val tableMetadata = if (tableIdentifier.database.isDefined) {
-      sessionCatalog.getTableMetadata(tableIdentifier)
-    } else {
-      sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
-    }
+    val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
 
     val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
     val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7143adf02b..8ae6868c98 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           assert(
             intercept[AnalysisException] {
               sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
-            }.getMessage.contains("Table default.createdJsonTable already exists."),
+            }.getMessage.contains("Table createdJsonTable already exists."),
             "We should complain that createdJsonTable already exists")
         }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 38482f66a3..c927e5d802 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -678,8 +678,8 @@ class HiveDDLSuite
           .createTempView(sourceViewName)
         sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
 
-        val sourceTable =
-          spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
+        val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(
+          TableIdentifier(sourceViewName))
         val targetTable = spark.sessionState.catalog.getTableMetadata(
           TableIdentifier(targetTabName, Some("default")))
 
-- 
GitLab