diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9df8ce1fa3b28793298e61f87151dca00c80c516..e5e2cd7d27d15378bcb3a9a513ce7078032abdb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -88,6 +88,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
       // Operator combine
       CollapseRepartition,
       CollapseProject,
+      CollapseWindow,
       CombineFilters,
       CombineLimits,
       CombineUnions,
@@ -537,6 +538,17 @@ object CollapseRepartition extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Collapse Adjacent Window Expression.
+ * - If the partition specs and order specs are the same, collapse into the parent.
+ */
+object CollapseWindow extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
+      w.copy(windowExpressions = we1 ++ we2, child = grandChild)
+  }
+}
+
 /**
  * Generate a list of additional filters from an operator's existing constraint but remove those
  * that are either already part of the operator's condition or are part of the operator's child
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..797076e55cfccb4905400c7bce6e721a27701d2e
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class CollapseWindowSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("CollapseWindow", FixedPoint(10),
+        CollapseWindow) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.double, 'b.double, 'c.string)
+  val a = testRelation.output(0)
+  val b = testRelation.output(1)
+  val c = testRelation.output(2)
+  val partitionSpec1 = Seq(c)
+  val partitionSpec2 = Seq(c + 1)
+  val orderSpec1 = Seq(c.asc)
+  val orderSpec2 = Seq(c.desc)
+
+  test("collapse two adjacent windows with the same partition/order") {
+    val query = testRelation
+      .window(Seq(min(a).as('min_a)), partitionSpec1, orderSpec1)
+      .window(Seq(max(a).as('max_a)), partitionSpec1, orderSpec1)
+      .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1)
+      .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = testRelation.window(Seq(
+        avg(b).as('avg_b),
+        sum(b).as('sum_b),
+        max(a).as('max_a),
+        min(a).as('min_a)), partitionSpec1, orderSpec1)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Don't collapse adjacent windows with different partitions or orders") {
+    val query1 = testRelation
+      .window(Seq(min(a).as('min_a)), partitionSpec1, orderSpec1)
+      .window(Seq(max(a).as('max_a)), partitionSpec1, orderSpec2)
+
+    val optimized1 = Optimize.execute(query1.analyze)
+    val correctAnswer1 = query1.analyze
+
+    comparePlans(optimized1, correctAnswer1)
+
+    val query2 = testRelation
+      .window(Seq(min(a).as('min_a)), partitionSpec1, orderSpec1)
+      .window(Seq(max(a).as('max_a)), partitionSpec2, orderSpec1)
+
+    val optimized2 = Optimize.execute(query2.analyze)
+    val correctAnswer2 = query2.analyze
+
+    comparePlans(optimized2, correctAnswer2)
+  }
+}