diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 9bb466ac2d29cf6a0a3648ebb098ccfe84aa320e..8f8747e105932d5dd80591f67fae672bfa19d035 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -135,16 +135,25 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   /** Args that have cleaned such that differences in expression id should not affect equality */
   protected lazy val cleanArgs: Seq[Any] = {
     val input = children.flatMap(_.output)
+    def cleanExpression(e: Expression) = e match {
+      case a: Alias =>
+        // As the root of the expression, Alias will always take an arbitrary exprId, we need
+        // to erase that for equality testing.
+        val cleanedExprId = Alias(a.child, a.name)(ExprId(-1), a.qualifiers)
+        BindReferences.bindReference(cleanedExprId, input, allowFailures = true)
+      case other => BindReferences.bindReference(other, input, allowFailures = true)
+    }
+
     productIterator.map {
       // Children are checked using sameResult above.
       case tn: TreeNode[_] if containsChild(tn) => null
-      case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+      case e: Expression => cleanExpression(e)
       case s: Option[_] => s.map {
-        case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+        case e: Expression => cleanExpression(e)
         case other => other
       }
       case s: Seq[_] => s.map {
-        case e: Expression => BindReferences.bindReference(e, input, allowFailures = true)
+        case e: Expression => cleanExpression(e)
         case other => other
       }
       case other => other
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 3a3541a8429b60408efa0f898d8220a059a6b55c..84e66b50cf1d781bcfd521a7cd627bd43d4d0ade 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.execution.PhysicalRDD
+
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
@@ -338,4 +340,18 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
       assert((accsSize - 2) == Accumulators.originals.size)
     }
   }
+
+  test("SPARK-10327 Cache Table is not working while subquery has alias in its project list") {
+    ctx.sparkContext.parallelize((1, 1) :: (2, 2) :: Nil)
+      .toDF("key", "value").selectExpr("key", "value", "key+1").registerTempTable("abc")
+    ctx.cacheTable("abc")
+
+    val sparkPlan = sql(
+      """select a.key, b.key, c.key from
+        |abc a join abc b on a.key=b.key
+        |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
+
+    assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3)
+    assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
+  }
 }