diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8008fcd639f148a35008c4d8dc2512889ff7c9cd..e9543f79878b7be955fefe4c836143d5b34b0b95 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
 
+import com.google.common.cache.{Cache, CacheBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
@@ -117,6 +117,14 @@ class SessionCatalog(
     if (conf.caseSensitiveAnalysis) name else name.toLowerCase
   }
 
+  /**
+   * A cache of qualified table name to table relation plan.
+   */
+  val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
+    // TODO: create a config instead of hardcode 1000 here.
+    CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, LogicalPlan]()
+  }
+
   /**
    * This method is used to make the given path qualified before we
    * store this path in the underlying external catalog. So, when a path
@@ -573,7 +581,7 @@ class SessionCatalog(
       val relationAlias = alias.getOrElse(table)
       if (db == globalTempViewManager.database) {
         globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(relationAlias, viewDef, Some(name))
+          SubqueryAlias(relationAlias, viewDef, None)
         }.getOrElse(throw new NoSuchTableException(db, table))
       } else if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
@@ -586,12 +594,12 @@ class SessionCatalog(
             desc = metadata,
             output = metadata.schema.toAttributes,
             child = parser.parsePlan(viewText))
-          SubqueryAlias(relationAlias, child, Option(name))
+          SubqueryAlias(relationAlias, child, Some(name.copy(table = table, database = Some(db))))
         } else {
           SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
         }
       } else {
-        SubqueryAlias(relationAlias, tempTables(table), Option(name))
+        SubqueryAlias(relationAlias, tempTables(table), None)
       }
     }
   }
@@ -651,14 +659,21 @@ class SessionCatalog(
    * Refresh the cache entry for a metastore table, if any.
    */
   def refreshTable(name: TableIdentifier): Unit = synchronized {
+    val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
+    val tableName = formatTableName(name.table)
+
     // Go through temporary tables and invalidate them.
-    // If the database is defined, this is definitely not a temp table.
+    // If the database is defined, this may be a global temporary view.
     // If the database is not defined, there is a good chance this is a temp table.
     if (name.database.isEmpty) {
-      tempTables.get(formatTableName(name.table)).foreach(_.refresh())
-    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
-      globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
+      tempTables.get(tableName).foreach(_.refresh())
+    } else if (dbName == globalTempViewManager.database) {
+      globalTempViewManager.get(tableName).foreach(_.refresh())
     }
+
+    // Also invalidate the table relation cache.
+    val qualifiedTableName = QualifiedTableName(dbName, tableName)
+    tableRelationCache.invalidate(qualifiedTableName)
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 834897b85023d950b0e2f082477770226247d666..26697e9867b35589a7ee0a49dd98351c62723452 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: Option[String])
   override val identifier: String = table
 
   def this(table: String) = this(table, None)
-
 }
 
+/** A fully qualified identifier for a table (i.e., database.tableName) */
+case class QualifiedTableName(database: String, name: String)
+
 object TableIdentifier {
   def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 7a7de25acb07013241665c6fefc58a289276d873..f935de68af899a10536712bc4c2c8a549cebbf63 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest {
       == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
     // Otherwise, we'll first look up a temporary table with the same name
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
-      == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
+      == SubqueryAlias("tbl1", tempTable1, None))
     // Then, if that does not exist, look up the relation in the current database
     sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
@@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest {
     val tmpView = Range(1, 10, 2, 10)
     catalog.createTempView("vw1", tmpView, overrideIfExists = false)
     val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
-    assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
+    assert(plan == SubqueryAlias("range", tmpView, None))
   }
 
   test("look up view relation") {
@@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest {
     // Look up a view using current database of the session catalog.
     sessionCatalog.setCurrentDatabase("db3")
     comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
-      SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
+      SubqueryAlias("view1", view, Some(TableIdentifier("view1", Some("db3")))))
   }
 
   test("table exists") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7fc03bd5ef37287d7f3438993cfb6364f6f75c62..ff1f0177e8ba0fbf31477f3bfec201cf418de9ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
             relation.catalogTable.identifier
         }
-        EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
+
+        val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
+        EliminateSubqueryAliases(tableRelation) match {
           // check if the table is a data source table (the relation is a BaseRelation).
           case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
             throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 1340c9bece38c2bbcc3fe884ba7f15dca620cd6c..d024a3673d4ba2eb8c67dd9baa815495a9bce28a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -40,7 +40,8 @@ case class AnalyzeColumnCommand(
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+    val relation =
+      EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
 
     // Compute total size
     val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 4a994e34aff8542e5701b9efde94ee9b97776f56..30b6cc7617cb3923082f1bd54e8a95369ffed15d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -41,7 +41,8 @@ case class AnalyzeTableCommand(
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+    val relation =
+      EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
 
     relation match {
       case relation: CatalogRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 246894813c3b9f3137333c5f830f8229d51550b1..1b596c97a1c4ec7daaab15a59acfed2849989111 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -450,7 +450,7 @@ case class DescribeTableCommand(
       if (metadata.schema.isEmpty) {
         // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
         // inferred at runtime. We should still support it.
-        describeSchema(catalog.lookupRelation(metadata.identifier).schema, result)
+        describeSchema(sparkSession.table(metadata.identifier).schema, result)
       } else {
         describeSchema(metadata.schema, result)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 3d3db06eee0c6cd75fd33ca617736fe40706fbe7..21b07ee85adc81a49ac77c8ade8e9aad362614ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.Callable
 
-import org.apache.hadoop.fs.Path
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
 
 /**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data
- * source information.
+ * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(
-      sparkSession: SparkSession,
-      simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
-    val table = simpleCatalogRelation.catalogTable
-    val pathOption = table.storage.locationUri.map("path" -> _)
-    val dataSource =
-      DataSource(
-        sparkSession,
-        userSpecifiedSchema = Some(table.schema),
-        partitionColumns = table.partitionColumnNames,
-        bucketSpec = table.bucketSpec,
-        className = table.provider.get,
-        options = table.storage.properties ++ pathOption)
-
-    LogicalRelation(
-      dataSource.resolveRelation(),
-      expectedOutputAttributes = Some(simpleCatalogRelation.output),
-      catalogTable = Some(table))
+  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+    val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
+    val cache = sparkSession.sessionState.catalog.tableRelationCache
+    val withHiveSupport =
+      sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive"
+
+    cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+      override def call(): LogicalPlan = {
+        val pathOption = table.storage.locationUri.map("path" -> _)
+        val dataSource =
+          DataSource(
+            sparkSession,
+            // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
+            // inferred at runtime. We should still support it.
+            userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+            partitionColumns = table.partitionColumnNames,
+            bucketSpec = table.bucketSpec,
+            className = table.provider.get,
+            options = table.storage.properties ++ pathOption,
+            // TODO: improve `InMemoryCatalog` and remove this limitation.
+            catalogTable = if (withHiveSupport) Some(table) else None)
+
+        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
+      }
+    })
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
         if DDLUtils.isDatasourceTable(s.metadata) =>
-      i.copy(table = readDataSourceTable(sparkSession, s))
+      i.copy(table = readDataSourceTable(s.metadata))
 
     case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-      readDataSourceTable(sparkSession, s)
+      readDataSourceTable(s.metadata)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 9136a83bc2d89049cc2d13b659a918ce2467edbf..3d9f41832bc73495f72fd15c5d1ddd2489699077 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
 
     // If this table is cached as an InMemoryRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
+    val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed
     // Use lookupCachedData directly since RefreshTable also takes databaseName.
     val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f4df80fd9c93f0d010154613a724fc892f175ced..621a46adf4fb268d7cf3950cb888df3f7cf5d02a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     assert(d.size == d.distinct.size)
   }
 
-  test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
-    val tableName = "tbl"
-    withTable(tableName) {
-      spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
-      val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
-      val expr = relation.resolve("i")
-      val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
-      qe.assertAnalyzed()
-    }
-  }
-
   private def verifyNullabilityInFilterExec(
       df: DataFrame,
       expr: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 97990a6d9b28c3d0c309ed29fa1c3a8e8eaea696..b4c9e276ece7a26e0a20824e2679eac7a4dac92c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1790,7 +1790,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("SET LOCATION for managed table") {
-    withTable("src") {
+    withTable("tbl") {
       withTempDir { dir =>
         sql("CREATE TABLE tbl(i INT) USING parquet")
         sql("INSERT INTO tbl SELECT 1")
@@ -1799,6 +1799,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
           .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get
 
         sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
+        spark.catalog.refreshTable("tbl")
         // SET LOCATION won't move data from previous table path to new table path.
         assert(spark.table("tbl").count() == 0)
         // the previous table path should be still there.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e4b1f6ae3e49e021041c798367b8e64705bdc7fa..faa76b73fde4b4330bafae00e634cf9ae58452f7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.sql.hive
 
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -41,9 +39,7 @@ import org.apache.spark.sql.types._
  */
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
   private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
-
-  /** A fully qualified identifier for a table (i.e., database.tableName) */
-  case class QualifiedTableName(database: String, name: String)
+  private lazy val tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
 
   private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
 
@@ -65,45 +61,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     }
   }
 
-  /** A cache of Spark SQL data source tables that have been accessed. */
-  protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = {
-    val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
-      override def load(in: QualifiedTableName): LogicalPlan = {
-        logDebug(s"Creating new cached data source for $in")
-        val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
-
-        val pathOption = table.storage.locationUri.map("path" -> _)
-        val dataSource =
-          DataSource(
-            sparkSession,
-            // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
-            // inferred at runtime. We should still support it.
-            userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
-            partitionColumns = table.partitionColumnNames,
-            bucketSpec = table.bucketSpec,
-            className = table.provider.get,
-            options = table.storage.properties ++ pathOption,
-            catalogTable = Some(table))
-
-        LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
-      }
-    }
-
-    CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
-  }
-
-  def refreshTable(tableIdent: TableIdentifier): Unit = {
-    // refreshTable does not eagerly reload the cache. It just invalidate the cache.
-    // Next time when we use the table, it will be populated in the cache.
-    // Since we also cache ParquetRelations converted from Hive Parquet tables and
-    // adding converted ParquetRelations into the cache is not defined in the load function
-    // of the cache (instead, we add the cache entry in convertToParquetRelation),
-    // it is better at here to invalidate the cache to avoid confusing waring logs from the
-    // cache loader (e.g. cannot find data source provider, which is only defined for
-    // data source table.).
-    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
-  }
-
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
@@ -111,45 +68,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     new Path(new Path(dbLocation), tblName).toString
   }
 
-  /**
-   * Returns a [[LogicalPlan]] that represents the given table or view from Hive metastore.
-   *
-   * @param tableIdent The name of the table/view that we look up.
-   * @param alias The alias name of the table/view that we look up.
-   * @return a [[LogicalPlan]] that represents the given table or view from Hive metastore.
-   */
-  def lookupRelation(
-      tableIdent: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val qualifiedTableName = getQualifiedTableName(tableIdent)
-    val table = sparkSession.sharedState.externalCatalog.getTable(
-      qualifiedTableName.database, qualifiedTableName.name)
-
-    if (DDLUtils.isDatasourceTable(table)) {
-      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
-      val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
-      // Then, if alias is specified, wrap the table with a Subquery using the alias.
-      // Otherwise, wrap the table with a Subquery using the table name.
-      alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
-    } else if (table.tableType == CatalogTableType.VIEW) {
-      val tableIdentifier = table.identifier
-      val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
-      // The relation is a view, so we wrap the relation by:
-      // 1. Add a [[View]] operator over the relation to keep track of the view desc;
-      // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
-      val child = View(
-        desc = table,
-        output = table.schema.toAttributes,
-        child = sparkSession.sessionState.sqlParser.parsePlan(viewText))
-      SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, Option(tableIdentifier))
-    } else {
-      val qualifiedTable =
-        MetastoreRelation(
-          qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
-      alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
-    }
-  }
-
   private def getCached(
       tableIdentifier: QualifiedTableName,
       pathsInMetastore: Seq[Path],
@@ -159,7 +77,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       expectedBucketSpec: Option[BucketSpec],
       partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
-    cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+    tableRelationCache.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -178,7 +96,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
               Some(logical)
             } else {
               // If the cached relation is not updated, we invalidate it right away.
-              cachedDataSourceTables.invalidate(tableIdentifier)
+              tableRelationCache.invalidate(tableIdentifier)
               None
             }
           case _ =>
@@ -187,7 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 s"should be stored as $expectedFileFormat. However, we are getting " +
                 s"a ${relation.fileFormat} from the metastore cache. This cached " +
                 s"entry will be invalidated.")
-            cachedDataSourceTables.invalidate(tableIdentifier)
+            tableRelationCache.invalidate(tableIdentifier)
             None
         }
       case other =>
@@ -195,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
           s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " +
             s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " +
             s"This cached entry will be invalidated.")
-        cachedDataSourceTables.invalidate(tableIdentifier)
+        tableRelationCache.invalidate(tableIdentifier)
         None
     }
   }
@@ -270,7 +188,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
 
           val created = LogicalRelation(relation,
             catalogTable = Some(metastoreRelation.catalogTable))
-          cachedDataSourceTables.put(tableIdentifier, created)
+          tableRelationCache.put(tableIdentifier, created)
           created
         }
 
@@ -298,7 +216,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 className = fileType).resolveRelation(),
               catalogTable = Some(metastoreRelation.catalogTable))
 
-          cachedDataSourceTables.put(tableIdentifier, created)
+          tableRelationCache.put(tableIdentifier, created)
           created
         }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index b3cbbedbe1ee30c13040442ac55b1627ac7b9bb3..44ef5cce2ee05493300500825e1cd1f6bc26234b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, Gener
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.internal.SQLConf
@@ -58,28 +58,6 @@ private[sql] class HiveSessionCatalog(
     hadoopConf,
     parser) {
 
-  override def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
-    synchronized {
-      val table = formatTableName(name.table)
-      val db = formatDatabaseName(name.database.getOrElse(currentDb))
-      if (db == globalTempViewManager.database) {
-        val relationAlias = alias.getOrElse(table)
-        globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(relationAlias, viewDef, Some(name))
-        }.getOrElse(throw new NoSuchTableException(db, table))
-      } else if (name.database.isDefined || !tempTables.contains(table)) {
-        val newName = name.copy(database = Some(db), table = table)
-        metastoreCatalog.lookupRelation(newName, alias)
-      } else {
-        val relation = tempTables(table)
-        val tableWithQualifiers = SubqueryAlias(table, relation, None)
-        // If an alias was specified by the lookup, wrap the plan in a subquery so that
-        // attributes are properly qualified with this alias.
-        alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
-      }
-    }
-  }
-
   // ----------------------------------------------------------------
   // | Methods and fields for interacting with HiveMetastoreCatalog |
   // ----------------------------------------------------------------
@@ -93,15 +71,6 @@ private[sql] class HiveSessionCatalog(
   val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
   val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
 
-  override def refreshTable(name: TableIdentifier): Unit = {
-    super.refreshTable(name)
-    metastoreCatalog.refreshTable(name)
-  }
-
-  def invalidateCache(): Unit = {
-    metastoreCatalog.cachedDataSourceTables.invalidateAll()
-  }
-
   def hiveDefaultTableFilePath(name: TableIdentifier): String = {
     metastoreCatalog.hiveDefaultTableFilePath(name)
   }
@@ -109,7 +78,7 @@ private[sql] class HiveSessionCatalog(
   // For testing only
   private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
     val key = metastoreCatalog.getQualifiedTableName(table)
-    metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
+    sparkSession.sessionState.catalog.tableRelationCache.getIfPresent(key)
   }
 
   override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 4e30d038b198572d9f82c4f424360543bd312c67..d3cef6e0cb0cf4c9999b097e8e9ba209325d61e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -67,6 +67,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         DataSourceAnalysis(conf) ::
         new DetermineHiveSerde(conf) ::
         new HiveAnalysis(sparkSession) ::
+        new FindDataSourceTable(sparkSession) ::
+        new FindHiveSerdeTable(sparkSession) ::
         new ResolveDataSource(sparkSession) :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7987a0a84c728271c7362a9bf64c99fd7721777d..b649612a406d20d9c101bf9cd13443e5b96abbc0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -127,6 +127,21 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces [[SimpleCatalogRelation]] with [[MetastoreRelation]] if its table provider is hive.
+ */
+class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+        if DDLUtils.isHiveTable(s.metadata) =>
+      i.copy(table =
+        MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session))
+
+    case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) =>
+      MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)
+  }
+}
+
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
   self: SparkPlanner =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a001fb6f7b9efc04e2ada3324b4225a5833..ccc2d64c4a70ace5bd544d4cd903d4aed48f21ec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive.execution
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -73,7 +73,9 @@ case class CreateHiveTableAsSelectCommand(
 
       // Get the Metastore Relation
       sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-        case r: MetastoreRelation => r
+        case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
+          val tableMeta = r.metadata
+          MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
       }
     }
     // TODO ideally, we should get the output data ready first and then
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index dcb8e498a406a2660aa576ec02bc392883bb3242..3267c237c865a4106131d9d3f05b4eee923f2524 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -431,7 +431,7 @@ private[hive] class TestHiveSparkSession(
       sharedState.cacheManager.clearCache()
       loadedTables.clear()
       sessionState.catalog.clearTempTables()
-      sessionState.catalog.invalidateCache()
+      sessionState.catalog.tableRelationCache.invalidateAll()
 
       sessionState.metadataHive.reset()
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 081f6f6d8263ff195657fad97cd82471f1e490cb..f0e2c9369bd05fbe572cfa8f37f7f7b0ed455a8d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1322,4 +1322,26 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
+
+  test("SPARK-18464: support old table which doesn't store schema in table properties") {
+    withTable("old") {
+      withTempPath { path =>
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        val tableDesc = CatalogTable(
+          identifier = TableIdentifier("old", Some("default")),
+          tableType = CatalogTableType.EXTERNAL,
+          storage = CatalogStorageFormat.empty.copy(
+            properties = Map("path" -> path.getAbsolutePath)
+          ),
+          schema = new StructType(),
+          properties = Map(
+            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
+        hiveClient.createTable(tableDesc, ignoreIfExists = false)
+
+        checkAnswer(spark.table("old"), Row(1, "a"))
+
+        checkAnswer(sql("DESC old"), Row("i", "int", null) :: Row("j", "string", null) :: Nil)
+      }
+    }
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 0053aa1642ce2ed16ea883d4474335748496e8bb..e2fcd2fd41fa13f376720df7e0bcf7bd3fe1cc60 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -62,7 +62,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
         spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
 
-        val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+        val relation = spark.table("csv_table").queryExecution.analyzed.children.head
           .asInstanceOf[MetastoreRelation]
 
         val properties = relation.hiveQlTable.getParameters
@@ -80,7 +80,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).stats(conf).sizeInBytes
+      spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -451,7 +451,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
       // Table lookup will make the table cached.
-      catalog.lookupRelation(tableIndent)
+      spark.table(tableIndent)
       statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
 
@@ -461,7 +461,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       } else {
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
-      catalog.lookupRelation(tableIndent)
+      spark.table(tableIndent)
       statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 953e29127fb88414cb383c634a922d4aca04b72b..104b5250b645881add84798755caf3a2dffe0543 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
@@ -513,8 +514,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       isDataSourceTable: Boolean,
       format: String,
       userSpecifiedLocation: Option[String] = None): Unit = {
-    val relation = EliminateSubqueryAliases(
-      sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+    var relation: LogicalPlan = null
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
+      relation = EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed)
+    }
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
@@ -1021,13 +1026,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
     read.json(rdd).createOrReplaceTempView("data")
-    val originalConf = sessionState.conf.convertCTAS
-    setConf(SQLConf.CONVERT_CTAS, false)
 
-    try {
+    withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
       table("explodeTest").queryExecution.analyzed match {
-        case metastoreRelation: MetastoreRelation => // OK
+        case SubqueryAlias(_, r: MetastoreRelation, _) => // OK
         case _ =>
           fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
       }
@@ -1040,8 +1043,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
       sql("DROP TABLE explodeTest")
       dropTempTable("data")
-    } finally {
-      setConf(SQLConf.CONVERT_CTAS, originalConf)
     }
   }