diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8373fa336dd4c59da5042fd9d33c42a104e8e8b7..689e016a5a1d94c9b678595c051a03341a1bc476 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -43,6 +43,41 @@ object UnsupportedOperationChecker {
         "Queries without streaming sources cannot be executed with write.startStream()")(plan)
     }
 
+    // Disallow multiple streaming aggregations
+    val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+
+    if (aggregates.size > 1) {
+      throwError(
+        "Multiple streaming aggregations are not supported with " +
+          "streaming DataFrames/Datasets")(plan)
+    }
+
+    // Disallow some output mode
+    outputMode match {
+      case InternalOutputModes.Append if aggregates.nonEmpty =>
+        throwError(
+          s"$outputMode output mode not supported when there are streaming aggregations on " +
+            s"streaming DataFrames/DataSets")(plan)
+
+      case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
+        throwError(
+          s"$outputMode output mode not supported when there are no streaming aggregations on " +
+            s"streaming DataFrames/Datasets")(plan)
+
+      case _ =>
+    }
+
+    /**
+     * Whether the subplan will contain complete data or incremental data in every incremental
+     * execution. Some operations may be allowed only when the child logical plan gives complete
+     * data.
+     */
+    def containsCompleteData(subplan: LogicalPlan): Boolean = {
+      val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+      // Either the subplan has no streaming source, or it has aggregation with Complete mode
+      !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
+    }
+
     plan.foreachUp { implicit subPlan =>
 
       // Operations that cannot exists anywhere in a streaming plan
@@ -107,8 +142,9 @@ object UnsupportedOperationChecker {
         case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
           throwError("Limits are not supported on streaming DataFrames/Datasets")
 
-        case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) =>
-          throwError("Sorting is not supported on streaming DataFrames/Datasets")
+        case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
+          throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
+            "aggregated DataFrame/Dataset in Complete mode")
 
         case Sample(_, _, _, _, child) if child.isStreaming =>
           throwError("Sampling is not supported on streaming DataFrames/Datasets")
@@ -123,27 +159,6 @@ object UnsupportedOperationChecker {
         case _ =>
       }
     }
-
-    // Checks related to aggregations
-    val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a }
-    outputMode match {
-      case InternalOutputModes.Append if aggregates.nonEmpty =>
-        throwError(
-          s"$outputMode output mode not supported when there are streaming aggregations on " +
-            s"streaming DataFrames/DataSets")(plan)
-
-      case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
-        throwError(
-          s"$outputMode output mode not supported when there are no streaming aggregations on " +
-            s"streaming DataFrames/Datasets")(plan)
-
-      case _ =>
-    }
-    if (aggregates.size > 1) {
-      throwError(
-        "Multiple streaming aggregations are not supported with " +
-          "streaming DataFrames/Datasets")(plan)
-    }
   }
 
   private def throwErrorIf(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 378cca3644eab06e0403492e8f24115a10642b01..c21ad5e03a48d48d367e72ca72e896bb1fa3ffe3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -81,7 +81,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     outputMode = Append,
     expectedMsgs = "commands" :: Nil)
 
-  // Multiple streaming aggregations not supported
+  // Aggregation: Multiple streaming aggregations not supported
   def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
 
   assertSupportedInStreamingPlan(
@@ -189,8 +189,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.intersect(_),
     streamStreamSupported = false)
 
-  // Unary operations
+  // Sort: supported only on batch subplans and on aggregation + complete output mode
   testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
+  assertSupportedInStreamingPlan(
+    "sort - sort over aggregated data in Complete output mode",
+    streamRelation.groupBy()(Count("*")).sortBy(),
+    Complete)
+  assertNotSupportedInStreamingPlan(
+    "sort - sort over aggregated data in Update output mode",
+    streamRelation.groupBy()(Count("*")).sortBy(),
+    Update,
+    Seq("sort", "aggregat", "complete")) // sort on aggregations is supported on Complete mode only
+
+
+  // Other unary operations
   testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
   testUnaryOperatorInStreamingPlan(
     "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
@@ -299,6 +311,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
       outputMode)
   }
 
+  /** Test output mode with and without aggregation in the streaming plan */
   def testOutputMode(
       outputMode: OutputMode,
       shouldSupportAggregation: Boolean): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 194c3e7307255c14cab1bb38a38355cb8793604b..7f1e5fe6135a77ea0d99fee8540d8e1395d9f9fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -111,10 +111,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     def apply[A : Encoder](data: A*): CheckAnswerRows = {
       val encoder = encoderFor[A]
       val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
-      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false)
+      CheckAnswerRows(
+        data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
+        lastOnly = false,
+        isSorted = false)
     }
 
-    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false)
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, false)
   }
 
   /**
@@ -123,15 +126,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
    */
   object CheckLastBatch {
     def apply[A : Encoder](data: A*): CheckAnswerRows = {
+      apply(isSorted = false, data: _*)
+    }
+
+    def apply[A: Encoder](isSorted: Boolean, data: A*): CheckAnswerRows = {
       val encoder = encoderFor[A]
       val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
-      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true)
+      CheckAnswerRows(
+        data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
+        lastOnly = true,
+        isSorted = isSorted)
     }
 
-    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true)
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true, false)
   }
 
-  case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean)
+  case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean, isSorted: Boolean)
       extends StreamAction with StreamMustBeRunning {
     override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
     private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
@@ -414,7 +424,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                 failTest("Error adding data", e)
             }
 
-          case CheckAnswerRows(expectedAnswer, lastOnly) =>
+          case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
             verify(currentStream != null, "stream not running")
             // Get the map of source index to the current source objects
             val indexToSource = currentStream
@@ -436,7 +446,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                 failTest("Exception while getting data from sink", e)
             }
 
-            QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+            QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach {
               error => failTest(error)
             }
         }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 1f174aee8ce08cc27229b2bd74f0df202bd95c63..8681199817fe680d7d0e7f1e733281a3a4f6c607 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -104,6 +104,31 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
     }
   }
 
+  test("sort after aggregate in complete mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .toDF("value", "count")
+        .orderBy($"count".desc)
+        .as[(Int, Long)]
+
+    testStream(aggregated, Complete)(
+      AddData(inputData, 3),
+      CheckLastBatch(isSorted = true, (3, 1)),
+      AddData(inputData, 2, 3),
+      CheckLastBatch(isSorted = true, (3, 2), (2, 1)),
+      StopStream,
+      StartStream(),
+      AddData(inputData, 3, 2, 1),
+      CheckLastBatch(isSorted = true, (3, 3), (2, 2), (1, 1)),
+      AddData(inputData, 4, 4, 4, 4),
+      CheckLastBatch(isSorted = true, (4, 4), (3, 3), (2, 2), (1, 1))
+    )
+  }
+
   test("multiple keys") {
     val inputData = MemoryStream[Int]