Skip to content
Snippets Groups Projects
Commit 4f7292c8 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

## What changes were proposed in this pull request?

Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true.

## How was this patch tested?

Added unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16289 from tdas/SPARK-18870.
parent 01e14bf3
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis ...@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.{AnalysisException, InternalOutputModes} import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.streaming.OutputMode
...@@ -95,6 +96,16 @@ object UnsupportedOperationChecker { ...@@ -95,6 +96,16 @@ object UnsupportedOperationChecker {
// Operations that cannot exists anywhere in a streaming plan // Operations that cannot exists anywhere in a streaming plan
subPlan match { subPlan match {
case Aggregate(_, aggregateExpressions, child) =>
val distinctAggExprs = aggregateExpressions.flatMap { expr =>
expr.collect { case ae: AggregateExpression if ae.isDistinct => ae }
}
throwErrorIf(
child.isStreaming && distinctAggExprs.nonEmpty,
"Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " +
"it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " +
"approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).")
case _: Command => case _: Command =>
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets") "streaming DataFrames/Datasets")
...@@ -143,7 +154,7 @@ object UnsupportedOperationChecker { ...@@ -143,7 +154,7 @@ object UnsupportedOperationChecker {
throwError("Union between streaming and batch DataFrames/Datasets is not supported") throwError("Union between streaming and batch DataFrames/Datasets is not supported")
case Except(left, right) if right.isStreaming => case Except(left, right) if right.isStreaming =>
throwError("Except with a streaming DataFrame/Dataset on the right is not supported") throwError("Except on a streaming DataFrame/Dataset on the right is not supported")
case Intersect(left, right) if left.isStreaming && right.isStreaming => case Intersect(left, right) if left.isStreaming && right.isStreaming =>
throwError("Intersect between two streaming DataFrames/Datasets is not supported") throwError("Intersect between two streaming DataFrames/Datasets is not supported")
...@@ -156,7 +167,7 @@ object UnsupportedOperationChecker { ...@@ -156,7 +167,7 @@ object UnsupportedOperationChecker {
case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
"aggregated DataFrame/Dataset in Complete mode") "aggregated DataFrame/Dataset in Complete output mode")
case Sample(_, _, _, _, child) if child.isStreaming => case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets") throwError("Sampling is not supported on streaming DataFrames/Datasets")
......
...@@ -98,6 +98,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite { ...@@ -98,6 +98,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Update, outputMode = Update,
expectedMsgs = Seq("multiple streaming aggregations")) expectedMsgs = Seq("multiple streaming aggregations"))
// Aggregation: Distinct aggregates not supported on streaming relation
val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c"))
assertSupportedInStreamingPlan(
"distinct aggregate - aggregate on batch relation",
Aggregate(Nil, distinctAggExprs, batchRelation),
outputMode = Append)
assertNotSupportedInStreamingPlan(
"distinct aggregate - aggregate on streaming relation",
Aggregate(Nil, distinctAggExprs, streamRelation),
outputMode = Complete,
expectedMsgs = Seq("distinct aggregation"))
// Inner joins: Stream-stream not supported // Inner joins: Stream-stream not supported
testBinaryOperationInStreamingPlan( testBinaryOperationInStreamingPlan(
"inner join", "inner join",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment