diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 3037d45cc6e35ba01559b8fd7a7828920d1ba4d5..406ffd6801e9864355c53eb49f3140598db0a60a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._
 
 object Optimizer extends RuleExecutor[LogicalPlan] {
   val batches =
-    Batch("ConstantFolding", Once,
+    Batch("ConstantFolding", FixedPoint(100),
       NullPropagation,
       ConstantFolding,
       BooleanSimplification,
       SimplifyFilters,
       SimplifyCasts) ::
-    Batch("Filter Pushdown", Once,
+    Batch("Filter Pushdown", FixedPoint(100),
       CombineFilters,
       PushPredicateThroughProject,
       PushPredicateThroughInnerJoin,
@@ -49,17 +49,19 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
  */
 object ColumnPruning extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // Eliminate attributes that are not needed to calculate the specified aggregates.
     case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
-      // Project away references that are not needed to calculate the required aggregates.
       a.copy(child = Project(a.references.toSeq, child))
 
+    // Eliminate unneeded attributes from either side of a Join.
     case Project(projectList, Join(left, right, joinType, condition)) =>
       // Collect the list of off references required either above or to evaluate the condition.
       val allReferences: Set[Attribute] =
         projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
-      /** Applies a projection when the child is producing unnecessary attributes */
+
+      /** Applies a projection only when the child is producing unnecessary attributes */
       def prunedChild(c: LogicalPlan) =
-        if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
+        if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
           Project(allReferences.filter(c.outputSet.contains).toSeq, c)
         } else {
           c
@@ -67,6 +69,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
 
       Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))
 
+    // Combine adjacent Projects.
     case Project(projectList1, Project(projectList2, child)) =>
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
@@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
       }).asInstanceOf[Seq[NamedExpression]]
 
       Project(substitutedProjection, child)
+
+    // Eliminate no-op Projects
+    case Project(projectList, child) if(child.output == projectList) => child
   }
 }