diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ef29c75c0189878579d34223b49452f34e6f00b2..8c01c7a3f2bd50b76c96c213fd58d7b8c8a38371 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -245,6 +245,16 @@ class SessionCatalog(
     externalCatalog.alterTable(newTableDefinition)
   }
 
+  /**
+   * Return whether a table/view with the specified name exists. If no database is specified, check
+   * with current database.
+   */
+  def tableExists(name: TableIdentifier): Boolean = synchronized {
+    val db = formatDatabaseName(name.database.getOrElse(currentDb))
+    val table = formatTableName(name.table)
+    externalCatalog.tableExists(db, table)
+  }
+
   /**
    * Retrieve the metadata of an existing permanent table/view. If no database is specified,
    * assume the table/view is in the current database. If the specified table/view is not found
@@ -270,24 +280,6 @@ class SessionCatalog(
     externalCatalog.getTableOption(db, table)
   }
 
-  /**
-   * Retrieve the metadata of an existing temporary view or permanent table/view.
-   * If the temporary view does not exist, tries to get the metadata an existing permanent
-   * table/view. If no database is specified, assume the table/view is in the current database.
-   * If the specified table/view is not found in the database then a [[NoSuchTableException]] is
-   * thrown.
-   */
-  def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = synchronized {
-    val table = formatTableName(name)
-    getTempView(table).map { plan =>
-      CatalogTable(
-        identifier = TableIdentifier(table),
-        tableType = CatalogTableType.VIEW,
-        storage = CatalogStorageFormat.empty,
-        schema = plan.output.toStructType)
-    }.getOrElse(getTableMetadata(TableIdentifier(name)))
-  }
-
   /**
    * Load files stored in given path into an existing metastore table.
    * If no database is specified, assume the table is in the current database.
@@ -368,6 +360,30 @@ class SessionCatalog(
   // | Methods that interact with temporary and metastore tables |
   // -------------------------------------------------------------
 
+  /**
+   * Retrieve the metadata of an existing temporary view or permanent table/view.
+   *
+   * If a database is specified in `name`, this will return the metadata of table/view in that
+   * database.
+   * If no database is specified, this will first attempt to get the metadata of a temporary view
+   * with the same name, then, if that does not exist, return the metadata of table/view in the
+   * current database.
+   */
+  def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
+    val table = formatTableName(name.table)
+    if (name.database.isDefined) {
+      getTableMetadata(name)
+    } else {
+      getTempView(table).map { plan =>
+        CatalogTable(
+          identifier = TableIdentifier(table),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = plan.output.toStructType)
+      }.getOrElse(getTableMetadata(name))
+    }
+  }
+
   /**
    * Rename a table.
    *
@@ -449,24 +465,6 @@ class SessionCatalog(
     }
   }
 
-  /**
-   * Return whether a table/view with the specified name exists.
-   *
-   * Note: If a database is explicitly specified, then this will return whether the table/view
-   * exists in that particular database instead. In that case, even if there is a temporary
-   * table with the same name, we will return false if the specified database does not
-   * contain the table/view.
-   */
-  def tableExists(name: TableIdentifier): Boolean = synchronized {
-    val db = formatDatabaseName(name.database.getOrElse(currentDb))
-    val table = formatTableName(name.table)
-    if (isTemporaryTable(name)) {
-      true
-    } else {
-      externalCatalog.tableExists(db, table)
-    }
-  }
-
   /**
    * Return whether a table with the specified name is a temporary table.
    *
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 384a7308615e574ae913f9aa82fa3be0c2f08995..915ed8f8b1787ed1dd6c80b3b2b968a3a04b890b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -425,35 +425,37 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
     // If database is explicitly specified, do not check temporary tables
     val tempTable = Range(1, 10, 1, 10)
-    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
     assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
     // If database is not explicitly specified, check the current database
     catalog.setCurrentDatabase("db2")
     assert(catalog.tableExists(TableIdentifier("tbl1")))
     assert(catalog.tableExists(TableIdentifier("tbl2")))
-    assert(catalog.tableExists(TableIdentifier("tbl3")))
-  }
 
-  test("tableExists on temporary views") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10)
-    assert(!catalog.tableExists(TableIdentifier("view1")))
-    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
-    catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.tableExists(TableIdentifier("view1")))
-    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
+    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
+    // tableExists should not check temp view.
+    assert(!catalog.tableExists(TableIdentifier("tbl3")))
   }
 
   test("getTempViewOrPermanentTableMetadata on temporary views") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val tempTable = Range(1, 10, 2, 10)
     intercept[NoSuchTableException] {
-      catalog.getTempViewOrPermanentTableMetadata("view1")
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
+    }.getMessage
+
+    intercept[NoSuchTableException] {
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
     }.getMessage
 
     catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
-      TableIdentifier("view1"), "the temporary view `view1` should exist")
+    assert(catalog.getTempViewOrPermanentTableMetadata(
+      TableIdentifier("view1")).identifier.table == "view1")
+    assert(catalog.getTempViewOrPermanentTableMetadata(
+      TableIdentifier("view1")).schema(0).name == "id")
+
+    intercept[NoSuchTableException] {
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", Some("default")))
+    }.getMessage
   }
 
   test("list tables without pattern") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9e343b5d2498667fbefded0c188428f5127d7090..64d3422cb4b544a1650d99dd12b73c61bb4b578b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
     }
 
-    val sessionState = df.sparkSession.sessionState
-    val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = tableIdent.copy(database = Some(db))
-    // Pass a table identifier with database part, so that `tableExists` won't check temp views
-    // unexpectedly.
-    val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
+    val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
@@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           bucketSpec = getBucketSpec
         )
         val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
-        sessionState.executePlan(cmd).toRdd
+        df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d8e20b09c1addf04c6bd5ddd56f27383ca72d4bd..a04a13e698c432b126ef5603106c640bff2fa457 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
     assert(table.provider.isDefined)
 
     val sessionState = sparkSession.sessionState
-    val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    // Pass a table identifier with database part, so that `tableExists` won't check temp views
-    // unexpectedly.
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+    if (sessionState.catalog.tableExists(table.identifier)) {
       if (ignoreIfExists) {
         return Seq.empty[Row]
       } else {
-        throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} already exists.")
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
       }
     }
 
@@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(
 
     var createMetastoreTable = false
     var existingSchema = Option.empty[StructType]
-    // Pass a table identifier with database part, so that `tableExists` won't check temp views
-    // unexpectedly.
     if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
       // Check if we need to throw an exception or just return.
       mode match {
@@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
           // TODO: Check that options from the resolved relation match the relation that we are
           // inserting into (i.e. using the same compression).
 
-          EliminateSubqueryAliases(
-            sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+          // Pass a table identifier with database part, so that `lookupRelation` won't get temp
+          // views unexpectedly.
+          EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
             case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               // check if the file formats match
               l.relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b57b2d280d8f830cad30251b913c10924b7a04ae..01ac89868d10097230cb3c87a02bd40019d01255 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,32 +183,25 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    if (!catalog.tableExists(tableName)) {
-      if (!ifExists) {
-        val objectName = if (isView) "View" else "Table"
-        throw new AnalysisException(s"$objectName to drop '$tableName' does not exist")
-      }
-    } else {
-      // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
-      // issue an exception.
-      catalog.getTableMetadataOption(tableName).map(_.tableType match {
-        case CatalogTableType.VIEW if !isView =>
-          throw new AnalysisException(
-            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
-        case o if o != CatalogTableType.VIEW && isView =>
-          throw new AnalysisException(
-            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
-        case _ =>
-      })
-      try {
-        sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession.table(tableName.quotedString))
-      } catch {
-        case NonFatal(e) => log.warn(e.toString, e)
-      }
-      catalog.refreshTable(tableName)
-      catalog.dropTable(tableName, ifExists, purge)
+    // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
+    // issue an exception.
+    catalog.getTableMetadataOption(tableName).map(_.tableType match {
+      case CatalogTableType.VIEW if !isView =>
+        throw new AnalysisException(
+          "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+      case o if o != CatalogTableType.VIEW && isView =>
+        throw new AnalysisException(
+          s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+      case _ =>
+    })
+    try {
+      sparkSession.sharedState.cacheManager.uncacheQuery(
+        sparkSession.table(tableName.quotedString))
+    } catch {
+      case NonFatal(e) => log.warn(e.toString, e)
     }
+    catalog.refreshTable(tableName)
+    catalog.dropTable(tableName, ifExists, purge)
     Seq.empty[Row]
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 94b46c5d97155e1e372155d03e8d422ce4b9aef7..0f61629317c81da0758ef2f2499ffffe797e95ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -59,16 +59,7 @@ case class CreateTableLikeCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    if (!catalog.tableExists(sourceTable)) {
-      throw new AnalysisException(
-        s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
-    }
-
-    val sourceTableDesc = if (sourceTable.database.isDefined) {
-      catalog.getTableMetadata(sourceTable)
-    } else {
-      catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
-    }
+    val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
 
     // Storage format
     val newStorage =
@@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) extends RunnableComman
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    val table = if (tableName.database.isDefined) {
-      catalog.getTableMetadata(tableName)
-    } else {
-      catalog.getTempViewOrPermanentTableMetadata(tableName.table)
-    }
+    val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
     table.schema.map { c =>
       Row(c.name)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6fecda232ab884dadd35ed31688993175814fc26..f2525357658993b6e852990794072294e0feecd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   }
 
   private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
-    val tableMetadata = if (tableIdentifier.database.isDefined) {
-      sessionCatalog.getTableMetadata(tableIdentifier)
-    } else {
-      sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
-    }
+    val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
 
     val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
     val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7143adf02b0e69ebbeda084cf5e8337ce7ff8732..8ae6868c9848a779a15f89905a67ae9de5debb28 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           assert(
             intercept[AnalysisException] {
               sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString)
-            }.getMessage.contains("Table default.createdJsonTable already exists."),
+            }.getMessage.contains("Table createdJsonTable already exists."),
             "We should complain that createdJsonTable already exists")
         }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 38482f66a38e96b518852248e8d04fa2c2ae50ac..c927e5d802c90226753bafa226183858d7c00974 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -678,8 +678,8 @@ class HiveDDLSuite
           .createTempView(sourceViewName)
         sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
 
-        val sourceTable =
-          spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
+        val sourceTable = spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(
+          TableIdentifier(sourceViewName))
         val targetTable = spark.sessionState.catalog.getTableMetadata(
           TableIdentifier(targetTabName, Some("default")))