diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 289453753f18db4daa9841f3d514145c3d92549e..1d3379a5e2d913380b1df6f8663eb4b7d4b44547 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -219,6 +219,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
     val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
     val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
     var children: Seq[SparkPlan] = operator.children
+    assert(requiredChildDistributions.length == children.length)
+    assert(requiredChildOrderings.length == children.length)
 
     // Ensure that the operator's children satisfy their output distribution requirements:
     children = children.zip(requiredChildDistributions).map { case (child, distribution) =>
@@ -248,8 +250,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
     children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
-        val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min
-        if (minSize == 0 || requiredOrdering.take(minSize) != child.outputOrdering.take(minSize)) {
+        if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
           sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, global = false, child)
         } else {
           child
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index cafa1d5154788b49783fc2c58aaa5adf28443b0d..ebdab1c26d7bddd52abe3f913b8cfab3491e78ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -354,6 +354,55 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("EnsureRequirements adds sort when there is no existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq.empty) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingB)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+      fail(s"Sort should have been added:\n$outputPlan")
+    }
+  }
+
+  test("EnsureRequirements skips sort when required ordering is prefix of existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA, orderingB)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.nonEmpty) {
+      fail(s"No sorts should have been added:\n$outputPlan")
+    }
+  }
+
+  // This is a regression test for SPARK-11135
+  test("EnsureRequirements adds sort when required ordering isn't a prefix of existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA, orderingB)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => true }.isEmpty) {
+      fail(s"Sort should have been added:\n$outputPlan")
+    }
+  }
+
   // ---------------------------------------------------------------------------------------------
 }