diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index dc2e40424fd5f58b616a118dcb10babd5ce1c33f..360e55d922821ee7e2575db2cc8dc4515b61d2cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -27,7 +27,7 @@ import com.google.common.base.Objects
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -403,14 +403,14 @@ object CatalogTypes {
  */
 case class CatalogRelation(
     tableMeta: CatalogTable,
-    dataCols: Seq[Attribute],
-    partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
+    dataCols: Seq[AttributeReference],
+    partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation {
   assert(tableMeta.identifier.database.isDefined)
   assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
   assert(tableMeta.dataSchema.sameType(dataCols.toStructType))
 
   // The partition column should always appear after data columns.
-  override def output: Seq[Attribute] = dataCols ++ partitionCols
+  override def output: Seq[AttributeReference] = dataCols ++ partitionCols
 
   def isPartitioned: Boolean = partitionCols.nonEmpty
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e5c7c383d708cf2bffb5147f47c34afb25bca8c8..2d83d512e702d894601c5169b7908bd7db62b24d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -231,16 +231,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
             options = table.storage.properties ++ pathOption,
             catalogTable = Some(table))
 
-        LogicalRelation(
-          dataSource.resolveRelation(checkFilesExist = false),
-          catalogTable = Some(table))
+        LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
       }
     }).asInstanceOf[LogicalRelation]
 
-    // It's possible that the table schema is empty and need to be inferred at runtime. We should
-    // not specify expected outputs for this case.
-    val expectedOutputs = if (r.output.isEmpty) None else Some(r.output)
-    plan.copy(expectedOutputAttributes = expectedOutputs)
+    if (r.output.isEmpty) {
+      // It's possible that the table schema is empty and need to be inferred at runtime. For this
+      // case, we don't need to change the output of the cached plan.
+      plan
+    } else {
+      plan.copy(output = r.output)
+    }
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 3b14b794fd08cab1c3d80a7005700f3a596d62fe..421520396007570904fa5f129e5a3d8fde822cb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
@@ -26,31 +26,13 @@ import org.apache.spark.util.Utils
 
 /**
  * Used to link a [[BaseRelation]] in to a logical query plan.
- *
- * Note that sometimes we need to use `LogicalRelation` to replace an existing leaf node without
- * changing the output attributes' IDs.  The `expectedOutputAttributes` parameter is used for
- * this purpose.  See https://issues.apache.org/jira/browse/SPARK-10741 for more details.
  */
 case class LogicalRelation(
     relation: BaseRelation,
-    expectedOutputAttributes: Option[Seq[Attribute]] = None,
-    catalogTable: Option[CatalogTable] = None)
+    output: Seq[AttributeReference],
+    catalogTable: Option[CatalogTable])
   extends LeafNode with MultiInstanceRelation {
 
-  override val output: Seq[AttributeReference] = {
-    val attrs = relation.schema.toAttributes
-    expectedOutputAttributes.map { expectedAttrs =>
-      assert(expectedAttrs.length == attrs.length)
-      attrs.zip(expectedAttrs).map {
-        // We should respect the attribute names provided by base relation and only use the
-        // exprId in `expectedOutputAttributes`.
-        // The reason is that, some relations(like parquet) will reconcile attribute names to
-        // workaround case insensitivity issue.
-        case (attr, expected) => attr.withExprId(expected.exprId)
-      }
-    }.getOrElse(attrs)
-  }
-
   // Logical Relations are distinct if they have different output for the sake of transformations.
   override def equals(other: Any): Boolean = other match {
     case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
@@ -87,11 +69,8 @@ case class LogicalRelation(
    * unique expression ids. We respect the `expectedOutputAttributes` and create
    * new instances of attributes in it.
    */
-  override def newInstance(): this.type = {
-    LogicalRelation(
-      relation,
-      expectedOutputAttributes.map(_.map(_.newInstance())),
-      catalogTable).asInstanceOf[this.type]
+  override def newInstance(): LogicalRelation = {
+    this.copy(output = output.map(_.newInstance()))
   }
 
   override def refresh(): Unit = relation match {
@@ -101,3 +80,11 @@ case class LogicalRelation(
 
   override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
 }
+
+object LogicalRelation {
+  def apply(relation: BaseRelation): LogicalRelation =
+    LogicalRelation(relation, relation.schema.toAttributes, None)
+
+  def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation =
+    LogicalRelation(relation, relation.schema.toAttributes, Some(table))
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8566a8061034b9436948e7c047fa7467f19f466b..905b8683e10bdeedf91103312b959c2615b7c35b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -59,9 +59,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
         val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
         val prunedFsRelation =
           fsRelation.copy(location = prunedFileIndex)(sparkSession)
-        val prunedLogicalRelation = logicalRelation.copy(
-          relation = prunedFsRelation,
-          expectedOutputAttributes = Some(logicalRelation.output))
+        val prunedLogicalRelation = logicalRelation.copy(relation = prunedFsRelation)
 
         // Keep partition-pruning predicates so that they are visible in physical planning
         val filterExpression = filters.reduceLeft(And)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
index 60adee4599b0bfac00f493348196bf69e3090ff0..6dd4847ead73811e270f5ccc21bf96fefb62d670 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala
@@ -75,13 +75,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |OPTIONS (PATH '/tmp/path')
         """.stripMargin)
-      assert(getPathOption("src") == Some("file:/tmp/path"))
+      assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path")))
     }
 
     // should exist even path option is not specified when creating table
     withTable("src") {
       sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
-      assert(getPathOption("src") == Some(CatalogUtils.URIToString(defaultTablePath("src"))))
+      assert(getPathOption("src").map(makeQualifiedPath) == Some(defaultTablePath("src")))
     }
   }
 
@@ -95,9 +95,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
             |OPTIONS (PATH '$p')
             |AS SELECT 1
           """.stripMargin)
-        assert(CatalogUtils.stringToURI(
-          spark.table("src").schema.head.metadata.getString("path")) ==
-          makeQualifiedPath(p.getAbsolutePath))
+        assert(
+          spark.table("src").schema.head.metadata.getString("path") ==
+          p.getAbsolutePath)
       }
     }
 
@@ -109,8 +109,9 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |AS SELECT 1
           """.stripMargin)
-      assert(spark.table("src").schema.head.metadata.getString("path") ==
-        CatalogUtils.URIToString(defaultTablePath("src")))
+      assert(
+        makeQualifiedPath(spark.table("src").schema.head.metadata.getString("path")) ==
+        defaultTablePath("src"))
     }
   }
 
@@ -122,13 +123,13 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext {
            |USING ${classOf[TestOptionsSource].getCanonicalName}
            |OPTIONS (PATH '/tmp/path')""".stripMargin)
       sql("ALTER TABLE src SET LOCATION '/tmp/path2'")
-      assert(getPathOption("src") == Some("/tmp/path2"))
+      assert(getPathOption("src").map(makeQualifiedPath) == Some(makeQualifiedPath("/tmp/path2")))
     }
 
     withTable("src", "src2") {
       sql(s"CREATE TABLE src(i int) USING ${classOf[TestOptionsSource].getCanonicalName}")
       sql("ALTER TABLE src RENAME TO src2")
-      assert(getPathOption("src2") == Some(CatalogUtils.URIToString(defaultTablePath("src2"))))
+      assert(getPathOption("src2").map(makeQualifiedPath) == Some(defaultTablePath("src2")))
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 10f432570e94b8fdb6be1fdad5e7e785191a0ae7..6b98066cb76c826d6f27b71b53fa64f17c2841c5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -175,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
             bucketSpec = None,
             fileFormat = fileFormat,
             options = options)(sparkSession = sparkSession)
-          val created = LogicalRelation(fsRelation, catalogTable = Some(updatedTable))
+          val created = LogicalRelation(fsRelation, updatedTable)
           tableRelationCache.put(tableIdentifier, created)
           created
         }
@@ -203,7 +203,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
                 bucketSpec = None,
                 options = options,
                 className = fileType).resolveRelation(),
-              catalogTable = Some(updatedTable))
+              table = updatedTable)
 
           tableRelationCache.put(tableIdentifier, created)
           created
@@ -212,7 +212,14 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         logicalRelation
       })
     }
-    result.copy(expectedOutputAttributes = Some(relation.output))
+    // The inferred schema may have different filed names as the table schema, we should respect
+    // it, but also respect the exprId in table relation output.
+    assert(result.output.length == relation.output.length &&
+      result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType })
+    val newOutput = result.output.zip(relation.output).map {
+      case (a1, a2) => a1.withExprId(a2.exprId)
+    }
+    result.copy(output = newOutput)
   }
 
   private def inferIfNeeded(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 2b3f36064c1f8f4f7f554542b8cde9769c5495b9..d3cbf898e2439444448e7279c2520a1a46bb0931 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -329,7 +329,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
         fileFormat = new ParquetFileFormat(),
         options = Map.empty)(sparkSession = spark)
 
-      val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+      val plan = LogicalRelation(relation, tableMeta)
       spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
 
       assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
@@ -342,7 +342,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
         bucketSpec = None,
         fileFormat = new ParquetFileFormat(),
         options = Map.empty)(sparkSession = spark)
-      val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
+      val samePlan = LogicalRelation(sameRelation, tableMeta)
 
       assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index cd8f94b1cc4f07ad72c031360f86daf0d53168d9..f818e29555468000ba2ecf24689ba8d6d0558f31 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -58,7 +58,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
           fileFormat = new ParquetFileFormat(),
           options = Map.empty)(sparkSession = spark)
 
-        val logicalRelation = LogicalRelation(relation, catalogTable = Some(tableMeta))
+        val logicalRelation = LogicalRelation(relation, tableMeta)
         val query = Project(Seq('i, 'p), Filter('p === 1, logicalRelation)).analyze
 
         val optimized = Optimize.execute(query)