Skip to content
Snippets Groups Projects
Commit 552e5f08 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan

## What changes were proposed in this pull request?

Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially incorrect results.

## How was this patch tested?
New test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16662 from tdas/SPARK-19314.
parent e20d9b15
No related branches found
No related tags found
No related merge requests found
...@@ -87,7 +87,7 @@ object UnsupportedOperationChecker { ...@@ -87,7 +87,7 @@ object UnsupportedOperationChecker {
* data. * data.
*/ */
def containsCompleteData(subplan: LogicalPlan): Boolean = { def containsCompleteData(subplan: LogicalPlan): Boolean = {
val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
// Either the subplan has no streaming source, or it has aggregation with Complete mode // Either the subplan has no streaming source, or it has aggregation with Complete mode
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
} }
......
...@@ -199,12 +199,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite { ...@@ -199,12 +199,17 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_), _.intersect(_),
streamStreamSupported = false) streamStreamSupported = false)
// Sort: supported only on batch subplans and on aggregation + complete output mode // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
assertSupportedInStreamingPlan( assertSupportedInStreamingPlan(
"sort - sort over aggregated data in Complete output mode", "sort - sort after aggregation in Complete output mode",
streamRelation.groupBy()(Count("*")).sortBy(), streamRelation.groupBy()(Count("*")).sortBy(),
Complete) Complete)
assertNotSupportedInStreamingPlan(
"sort - sort before aggregation in Complete output mode",
streamRelation.sortBy().groupBy()(Count("*")),
Complete,
Seq("sort", "aggregat", "complete"))
assertNotSupportedInStreamingPlan( assertNotSupportedInStreamingPlan(
"sort - sort over aggregated data in Update output mode", "sort - sort over aggregated data in Update output mode",
streamRelation.groupBy()(Count("*")).sortBy(), streamRelation.groupBy()(Count("*")).sortBy(),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment