diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 83ea302013ebf002597c48e892cc321ccd0c5900..059d8ff87b7ec2ed89df973381f088022dec901c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -312,6 +312,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
  *   - LeftSemiJoin
  */
 object ColumnPruning extends Rule[LogicalPlan] {
+  def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
+    output1.size == output2.size &&
+      output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     // Prunes the unused columns from project list of Project/Aggregate/Window/Expand
     case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
@@ -378,7 +382,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
     case p @ Project(_, l: LeafNode) => p
 
     // Eliminate no-op Projects
-    case p @ Project(projectList, child) if child.output == p.output => child
+    case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
 
     // for all other logical plans that inherits the output from it's children
     case p @ Project(_, child) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 5cab1fc95a3644b0fc5c40aacdc9807374544cb7..d09601e0343d795c729ce0678c39d6d631c5d392 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
-import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.types.StringType
@@ -252,6 +252,27 @@ class ColumnPruningSuite extends PlanTest {
     comparePlans(Optimize.execute(query), expected)
   }
 
+  test("Remove redundant projects in column pruning rule") {
+    val input = LocalRelation('key.int, 'value.string)
+
+    val query =
+      Project(Seq($"x.key", $"y.key"),
+        Join(
+          SubqueryAlias("x", input),
+          BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected =
+      Join(
+        Project(Seq($"x.key"), SubqueryAlias("x", input)),
+        BroadcastHint(
+          Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        Inner, None).analyze
+
+    comparePlans(optimized, expected)
+  }
+
   implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
   private val func = identity[Iterator[OtherTuple]] _
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index 2f382bbda0c5856abed97c073568b555fa1e16cd..e2f8146beee7bc0fe8409fec4301f9a77b12fc44 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -105,11 +105,10 @@ class JoinOptimizationSuite extends PlanTest {
     val optimized = Optimize.execute(query)
 
     val expected =
-      Project(Seq($"x.key", $"y.key"),
-        Join(
-          Project(Seq($"x.key"), SubqueryAlias("x", input)),
-          BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
-          Inner, None)).analyze
+      Join(
+        Project(Seq($"x.key"), SubqueryAlias("x", input)),
+        BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
+        Inner, None).analyze
 
     comparePlans(optimized, expected)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 5b4f6f1d2461bebf7ebf4e38ad7912682eb87fd3..f754acb7619fdcb29a4783986a2eb457d6237f26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -172,7 +172,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       val df = sqlContext.sql(
         "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
       testSparkPlanMetrics(df, 1, Map(
-        1L -> ("SortMergeJoin", Map(
+        0L -> ("SortMergeJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
           "number of output rows" -> 4L)))
       )
@@ -190,7 +190,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       val df = sqlContext.sql(
         "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
       testSparkPlanMetrics(df, 1, Map(
-        1L -> ("SortMergeOuterJoin", Map(
+        0L -> ("SortMergeOuterJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
           "number of output rows" -> 8L)))
       )
@@ -198,7 +198,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       val df2 = sqlContext.sql(
         "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
       testSparkPlanMetrics(df2, 1, Map(
-        1L -> ("SortMergeOuterJoin", Map(
+        0L -> ("SortMergeOuterJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
           "number of output rows" -> 8L)))
       )
@@ -298,7 +298,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       val df = sqlContext.sql(
         "SELECT * FROM testData2 JOIN testDataForJoin")
       testSparkPlanMetrics(df, 1, Map(
-        1L -> ("CartesianProduct", Map(
+        0L -> ("CartesianProduct", Map(
           "number of output rows" -> 12L)))
       )
     }