diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index e3237a88468756b3bfd988a5b453289dbf01b113..6030d90ed99c395d3955f349556e9c7a132ff650 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -678,12 +678,7 @@ class SessionCatalog(
             child = parser.parsePlan(viewText))
           SubqueryAlias(table, child)
         } else {
-          val tableRelation = CatalogRelation(
-            metadata,
-            // we assume all the columns are nullable.
-            metadata.dataSchema.asNullable.toAttributes,
-            metadata.partitionSchema.asNullable.toAttributes)
-          SubqueryAlias(table, tableRelation)
+          SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
         }
       } else {
         SubqueryAlias(table, tempTables(table))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index f86510624aa7825169ca8ae25b6f1bdbd46aba9e..5a8c4e7610fff09edbb2cf1071b6c0e6b5035907 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
-import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -405,11 +404,22 @@ object CatalogTypes {
   type TablePartitionSpec = Map[String, String]
 }
 
+/**
+ * A placeholder for a table relation, which will be replaced by concrete relation like
+ * `LogicalRelation` or `HiveTableRelation`, during analysis.
+ */
+case class UnresolvedCatalogRelation(tableMeta: CatalogTable) extends LeafNode {
+  assert(tableMeta.identifier.database.isDefined)
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = Nil
+}
 
 /**
- * A [[LogicalPlan]] that represents a table.
+ * A `LogicalPlan` that represents a hive table.
+ *
+ * TODO: remove this after we completely make hive as a data source.
  */
-case class CatalogRelation(
+case class HiveTableRelation(
     tableMeta: CatalogTable,
     dataCols: Seq[AttributeReference],
     partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
@@ -423,7 +433,7 @@ case class CatalogRelation(
   def isPartitioned: Boolean = partitionCols.nonEmpty
 
   override def equals(relation: Any): Boolean = relation match {
-    case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
+    case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output
     case _ => false
   }
 
@@ -431,7 +441,7 @@ case class CatalogRelation(
     Objects.hashCode(tableMeta.identifier, output)
   }
 
-  override lazy val canonicalized: LogicalPlan = copy(
+  override lazy val canonicalized: HiveTableRelation = copy(
     tableMeta = tableMeta.copy(
       storage = CatalogStorageFormat.empty,
       createTime = -1
@@ -444,15 +454,12 @@ case class CatalogRelation(
     })
 
   override def computeStats(): Statistics = {
-    // For data source tables, we will create a `LogicalRelation` and won't call this method, for
-    // hive serde tables, we will always generate a statistics.
-    // TODO: unify the table stats generation.
     tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
       throw new IllegalStateException("table stats must be specified.")
     }
   }
 
-  override def newInstance(): LogicalPlan = copy(
+  override def newInstance(): HiveTableRelation = copy(
     dataCols = dataCols.map(_.newInstance()),
     partitionCols = partitionCols.map(_.newInstance()))
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index d2b670d06d18c5b6b8e31c3dfd049f6f6a6a5164..1cce19989c60e912bd62cf1c10a8c17fef6f2d15 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -518,14 +517,14 @@ abstract class SessionCatalogSuite extends AnalysisTest {
       catalog.setCurrentDatabase("db2")
       // If we explicitly specify the database, we'll look up the relation in that database
       assert(catalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
-        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+        .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
       // Otherwise, we'll first look up a temporary table with the same name
       assert(catalog.lookupRelation(TableIdentifier("tbl1"))
         == SubqueryAlias("tbl1", tempTable1))
       // Then, if that does not exist, look up the relation in the current database
       catalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
       assert(catalog.lookupRelation(TableIdentifier("tbl1")).children.head
-        .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
+        .asInstanceOf[UnresolvedCatalogRelation].tableMeta == metastoreTable1)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 65c9ef40777a26f0d6715d40964c4f64fe56cc25..877051a60e910334f1d8eb22bf3fd36a6f26bdf8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -24,11 +24,11 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 
@@ -372,8 +372,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         // Get all input data source or hive relations of the query.
         val srcRelations = df.logicalPlan.collect {
           case LogicalRelation(src: BaseRelation, _, _) => src
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
-            relation.tableMeta.identifier
+          case relation: HiveTableRelation => relation.tableMeta.identifier
         }
 
         val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
@@ -383,8 +382,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           // check hive table relation when overwrite mode
-          case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
-            && srcRelations.contains(relation.tableMeta.identifier) =>
+          case relation: HiveTableRelation
+              if srcRelations.contains(relation.tableMeta.identifier) =>
             throw new AnalysisException(
               s"Cannot overwrite table $tableName that is also being read from")
           case _ => // OK
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index aa968d8b3c34dc4128b74e17a8bbed1e5a1446c0..a9887eb95279f7f4195f5b64c966bb058df8d1e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
@@ -2965,7 +2965,7 @@ class Dataset[T] private[sql](
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles
-      case r: CatalogRelation if DDLUtils.isHiveTable(r.tableMeta) =>
+      case r: HiveTableRelation =>
         r.tableMeta.storage.locationUri.map(_.toString).toArray
     }.flatten
     files.toSet.toArray
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 02f45abaa3f084a7531a2cb74cbdb5e5fc232c53..301c4f02647d5b258e144f599465d1140f3f0ff7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -99,7 +99,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
             val partitionData = fsRelation.location.listFiles(Nil, Nil)
             LocalRelation(partAttrs, partitionData.map(_.values))
 
-          case relation: CatalogRelation =>
+          case relation: HiveTableRelation =>
             val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
             val caseInsensitiveProperties =
               CaseInsensitiveMap(relation.tableMeta.storage.properties)
@@ -135,7 +135,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
         val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
         Some((AttributeSet(partAttrs), l))
 
-      case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+      case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
         val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
         Some((AttributeSet(partAttrs), relation))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 587b9b450ea2afd780723d2019b43352059a3443..237017742770a564718fcb40745b1f471b6b9dd9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -25,12 +25,12 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
@@ -207,15 +207,16 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
 
 
 /**
- * Replaces [[CatalogRelation]] with data source table if its table provider is not hive.
+ * Replaces [[UnresolvedCatalogRelation]] with concrete relation logical plans.
+ *
+ * TODO: we should remove the special handling for hive tables after completely making hive as a
+ * data source.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
-  private def readDataSourceTable(r: CatalogRelation): LogicalPlan = {
-    val table = r.tableMeta
+  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
     val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
-    val catalogProxy = sparkSession.sessionState.catalog
-
-    val plan = catalogProxy.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
+    val catalog = sparkSession.sessionState.catalog
+    catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
       override def call(): LogicalPlan = {
         val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
         val dataSource =
@@ -232,24 +233,30 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
 
         LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
       }
-    }).asInstanceOf[LogicalRelation]
+    })
+  }
 
-    if (r.output.isEmpty) {
-      // It's possible that the table schema is empty and need to be inferred at runtime. For this
-      // case, we don't need to change the output of the cached plan.
-      plan
-    } else {
-      plan.copy(output = r.output)
-    }
+  private def readHiveTable(table: CatalogTable): LogicalPlan = {
+    HiveTableRelation(
+      table,
+      // Hive table columns are always nullable.
+      table.dataSchema.asNullable.toAttributes,
+      table.partitionSchema.asNullable.toAttributes)
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _)
-        if DDLUtils.isDatasourceTable(r.tableMeta) =>
-      i.copy(table = readDataSourceTable(r))
+    case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
+        if DDLUtils.isDatasourceTable(tableMeta) =>
+      i.copy(table = readDataSourceTable(tableMeta))
+
+    case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
+      i.copy(table = readHiveTable(tableMeta))
+
+    case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
+      readDataSourceTable(tableMeta)
 
-    case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) =>
-      readDataSourceTable(r)
+    case UnresolvedCatalogRelation(tableMeta) =>
+      readHiveTable(tableMeta)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index cb8dc1e041a9b20a2ef0266e2a30f6fb06705453..84acca242aa41dce2fbd4c6709d57741f3e9b25c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -382,7 +382,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved =>
       table match {
-        case relation: CatalogRelation =>
+        case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
           preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames)
         case LogicalRelation(h: HadoopFsRelation, _, catalogTable) =>
@@ -427,7 +427,7 @@ object PreReadCheck extends (LogicalPlan => Unit) {
 
   private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = {
     operator match {
-      case _: CatalogRelation => 1
+      case _: HiveTableRelation => 1
       case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1
       case _: LeafNode => 0
       // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 41569762d3c598b67a49b74c011234d410955c52..5916cd76b8789de2a3474ad0af421b275d980605 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -171,7 +171,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     // Analyze only one column.
     sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
     val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
-      case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
+      case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
       case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
     }.head
     val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 808dc013f170beebbfe09ff7f05fff5fbf97759a..8bab059ed5e846ed89dacf62dea128e2f02010ba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -112,7 +112,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   }
 
   def convertToLogicalRelation(
-      relation: CatalogRelation,
+      relation: HiveTableRelation,
       options: Map[String, String],
       fileFormatClass: Class[_ <: FileFormat],
       fileType: String): LogicalRelation = {
@@ -210,7 +210,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     }
-    // The inferred schema may have different filed names as the table schema, we should respect
+    // The inferred schema may have different field names as the table schema, we should respect
     // it, but also respect the exprId in table relation output.
     assert(result.output.length == relation.output.length &&
       result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
@@ -221,7 +221,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
   }
 
   private def inferIfNeeded(
-      relation: CatalogRelation,
+      relation: HiveTableRelation,
       options: Map[String, String],
       fileFormat: FileFormat,
       fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9c60d22d35ce1104bd70d8fe73184b52401aa0a3..ae1e7e72e8c3fe93eb062abfdb181df5e3bca776 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -21,10 +21,9 @@ import java.io.IOException
 import java.util.Locale
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
@@ -116,7 +115,7 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
 
 class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case relation: CatalogRelation
+    case relation: HiveTableRelation
         if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
@@ -147,7 +146,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  */
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists)
+    case InsertIntoTable(r: HiveTableRelation, partSpec, query, overwrite, ifPartitionNotExists)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)
 
@@ -171,13 +170,13 @@ object HiveAnalysis extends Rule[LogicalPlan] {
 case class RelationConversions(
     conf: SQLConf,
     sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
-  private def isConvertible(relation: CatalogRelation): Boolean = {
+  private def isConvertible(relation: HiveTableRelation): Boolean = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
       serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
   }
 
-  private def convert(relation: CatalogRelation): LogicalRelation = {
+  private def convert(relation: HiveTableRelation): LogicalRelation = {
     val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
     if (serde.contains("parquet")) {
       val options = Map(ParquetOptions.MERGE_SCHEMA ->
@@ -194,14 +193,14 @@ case class RelationConversions(
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan transformUp {
       // Write path
-      case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists)
+      case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
         // Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
             !r.isPartitioned && isConvertible(r) =>
         InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
 
       // Read path
-      case relation: CatalogRelation
+      case relation: HiveTableRelation
           if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
         convert(relation)
     }
@@ -229,7 +228,7 @@ private[hive] trait HiveStrategies {
    */
   object HiveTableScans extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case PhysicalOperation(projectList, predicates, relation: CatalogRelation) =>
+      case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
         // Filter out all predicates that only deal with partition keys, these are given to the
         // hive table scan operator to be used for partition pruning.
         val partitionKeyIds = AttributeSet(relation.partitionCols)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index e191071efbf18864d3b8b4975ec1183adb8ff232..896f24f2e223d0e352338dc51ba8e530387680bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution._
@@ -50,7 +50,7 @@ import org.apache.spark.util.Utils
 private[hive]
 case class HiveTableScanExec(
     requestedAttributes: Seq[Attribute],
-    relation: CatalogRelation,
+    relation: HiveTableRelation,
     partitionPruningPred: Seq[Expression])(
     @transient private val sparkSession: SparkSession)
   extends LeafExecNode {
@@ -205,7 +205,7 @@ case class HiveTableScanExec(
     val input: AttributeSeq = relation.output
     HiveTableScanExec(
       requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
-      relation.canonicalized.asInstanceOf[CatalogRelation],
+      relation.canonicalized,
       QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c785aca9858205fb1a0c9e22c3944e61544ea0ad..e01198dd53178e9676b5ee0d825651002370539f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1372,6 +1372,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         hiveClient.createTable(tableDesc, ignoreIfExists = false)
 
         checkAnswer(spark.table("old"), Row(1, "a"))
+        checkAnswer(sql("select * from old"), Row(1, "a"))
 
         val expectedSchema = StructType(Seq(
           StructField("i", IntegerType, nullable = true),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 36566bffb93355721ccf45b7a36d976df9ee94bb..71cf79c473b461621a4713da533125ba8ca06c73 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -72,7 +72,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
                |LOCATION '${tempDir.toURI}'""".stripMargin)
 
           val relation = spark.table("csv_table").queryExecution.analyzed.children.head
-            .asInstanceOf[CatalogRelation]
+            .asInstanceOf[HiveTableRelation]
 
           val properties = relation.tableMeta.ignoredProperties
           assert(properties("totalSize").toLong <= 0, "external table totalSize must be <= 0")
@@ -905,7 +905,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   test("estimates the size of a test Hive serde tables") {
     val df = sql("""SELECT * FROM src""")
     val sizes = df.queryExecution.analyzed.collect {
-      case relation: CatalogRelation => relation.stats.sizeInBytes
+      case relation: HiveTableRelation => relation.stats.sizeInBytes
     }
     assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
     assert(sizes(0).equals(BigInt(5812)),
@@ -965,7 +965,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       () => (),
       metastoreQuery,
       metastoreAnswer,
-      implicitly[ClassTag[CatalogRelation]]
+      implicitly[ClassTag[HiveTableRelation]]
     )
   }
 
@@ -979,7 +979,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
     // Assert src has a size smaller than the threshold.
     val sizes = df.queryExecution.analyzed.collect {
-      case relation: CatalogRelation => relation.stats.sizeInBytes
+      case relation: HiveTableRelation => relation.stats.sizeInBytes
     }
     assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold
       && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 45bbb0c674be383a276238445115e4267d4fe8e7..ef3d9b27aad79bdc913faadf95ba4302cfbc1e80 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TestUtils
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTableType, CatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
@@ -454,7 +454,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       case LogicalRelation(r: HadoopFsRelation, _, _) =>
         if (!isDataSourceTable) {
           fail(
-            s"${classOf[CatalogRelation].getCanonicalName} is expected, but found " +
+            s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " +
               s"${HadoopFsRelation.getClass.getCanonicalName}.")
         }
         userSpecifiedLocation match {
@@ -464,11 +464,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
         }
         assert(catalogTable.provider.get === format)
 
-      case r: CatalogRelation =>
+      case r: HiveTableRelation =>
         if (isDataSourceTable) {
           fail(
             s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
-              s"${classOf[CatalogRelation].getCanonicalName}.")
+              s"${classOf[HiveTableRelation].getCanonicalName}.")
         }
         userSpecifiedLocation match {
           case Some(location) =>
@@ -948,7 +948,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
       table("explodeTest").queryExecution.analyzed match {
-        case SubqueryAlias(_, r: CatalogRelation) => // OK
+        case SubqueryAlias(_, r: HiveTableRelation) => // OK
         case _ =>
           fail("To correctly test the fix of SPARK-5875, explodeTest should be a MetastoreRelation")
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 8c855730c31f2f256da027de356103b1db99d2f9..60ccd996d6d5835630acbf3826f681d3f7edeb68 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHive._
@@ -475,7 +475,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
                 }
               } else {
                 queryExecution.analyzed.collectFirst {
-                  case _: CatalogRelation => ()
+                  case _: HiveTableRelation => ()
                 }.getOrElse {
                   fail(s"Expecting no conversion from orc to data sources, " +
                     s"but got:\n$queryExecution")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 23f21e6b9931ef944247ce720398f55cd89336ad..303884da19f09ba6da9c0b0594118b13f8335f9e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogRelation
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
@@ -812,7 +812,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
               }
             } else {
               queryExecution.analyzed.collectFirst {
-                case _: CatalogRelation =>
+                case _: HiveTableRelation =>
               }.getOrElse {
                 fail(s"Expecting no conversion from parquet to data sources, " +
                   s"but got:\n$queryExecution")