diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1ee5fb245fbb2f8db58e0fcfed29a9ab9535a115..b163707cc9925cbb986a8a85632a4155fb426386 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -43,6 +43,7 @@ object DefaultOptimizer extends Optimizer {
       PushPredicateThroughJoin,
       PushPredicateThroughGenerate,
       ColumnPruning,
+      ProjectCollapsing,
       CombineLimits) ::
     Batch("ConstantFolding", FixedPoint(100),
       NullPropagation,
@@ -114,7 +115,7 @@ object UnionPushdown extends Rule[LogicalPlan] {
  *   - Aggregate
  *   - Project <- Join
  *   - LeftSemiJoin
- *  - Collapse adjacent projections, performing alias substitution.
+ *  - Performing alias substitution.
  */
 object ColumnPruning extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -152,7 +153,28 @@ object ColumnPruning extends Rule[LogicalPlan] {
 
       Join(left, prunedChild(right, allReferences), LeftSemi, condition)
 
-    // Combine adjacent Projects.
+    case Project(projectList, Limit(exp, child)) =>
+      Limit(exp, Project(projectList, child))
+
+    // Eliminate no-op Projects
+    case Project(projectList, child) if child.output == projectList => child
+  }
+
+  /** Applies a projection only when the child is producing unnecessary attributes */
+  private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
+    if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
+      Project(allReferences.filter(c.outputSet.contains).toSeq, c)
+    } else {
+      c
+    }
+}
+
+/**
+ * Combines two adjacent [[Project]] operators into one, merging the
+ * expressions into one single expression.
+ */
+object ProjectCollapsing extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case Project(projectList1, Project(projectList2, child)) =>
       // Create a map of Aliases to their values from the child projection.
       // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
@@ -169,21 +191,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
       }).asInstanceOf[Seq[NamedExpression]]
 
       Project(substitutedProjection, child)
-
-    case Project(projectList, Limit(exp, child)) =>
-      Limit(exp, Project(projectList, child))
-      
-    // Eliminate no-op Projects
-    case Project(projectList, child) if child.output == projectList => child
   }
-
-  /** Applies a projection only when the child is producing unnecessary attributes */
-  private def prunedChild(c: LogicalPlan, allReferences: AttributeSet) =
-    if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
-      Project(allReferences.filter(c.outputSet.contains).toSeq, c)
-    } else {
-      c
-    }
 }
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 58d415d9011e1874036a4dc66697afad14c8d98e..0c428f7231b8e1413f381b9e752e9bbe9774b5ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -38,7 +38,8 @@ class FilterPushdownSuite extends PlanTest {
         PushPredicateThroughProject,
         PushPredicateThroughJoin,
         PushPredicateThroughGenerate,
-        ColumnPruning) :: Nil
+        ColumnPruning,
+        ProjectCollapsing) :: Nil
   }
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index f3107f7b51ad858de7005d36648cc4c6d507fe17..1f85dac682cbedca778d24c23fc964d76ea8f4a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -567,7 +567,9 @@ class DataFrame private[sql](
       case Column(expr: NamedExpression) => expr
       case Column(expr: Expression) => Alias(expr, expr.prettyString)()
     }
-    Project(namedExpressions.toSeq, logicalPlan)
+    // When user continuously call `select`, speed up analysis by collapsing `Project`
+    import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
+    Project(namedExpressions.toSeq, ProjectCollapsing(logicalPlan))
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d58438e5d129c240b7f6a0100f9f7c557f8c1876..52aa1f6558f80766cd7ada20c89839cd6cafc338 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -493,4 +493,16 @@ class DataFrameSuite extends QueryTest {
       testData.dropDuplicates(Seq("value2")),
       Seq(Row(2, 1, 2), Row(1, 1, 1)))
   }
+
+  test("SPARK-7276: Project collapse for continuous select") {
+    var df = testData
+    for (i <- 1 to 5) {
+      df = df.select($"*")
+    }
+
+    import org.apache.spark.sql.catalyst.plans.logical.Project
+    // make sure df have at most two Projects
+    val p = df.logicalPlan.asInstanceOf[Project].child.asInstanceOf[Project]
+    assert(!p.child.isInstanceOf[Project])
+  }
 }