diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index c054fcbef36f30365ec6f30b093617d91f4c9b78..c4a78f9d2113a79aaf8775bee22feccdaa191bfa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.{AnalysisException, InternalOutputModes} import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.OutputMode @@ -95,6 +96,16 @@ object UnsupportedOperationChecker { // Operations that cannot exists anywhere in a streaming plan subPlan match { + case Aggregate(_, aggregateExpressions, child) => + val distinctAggExprs = aggregateExpressions.flatMap { expr => + expr.collect { case ae: AggregateExpression if ae.isDistinct => ae } + } + throwErrorIf( + child.isStreaming && distinctAggExprs.nonEmpty, + "Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " + + "it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " + + "approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).") + case _: Command => throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") @@ -143,7 +154,7 @@ object UnsupportedOperationChecker { throwError("Union between streaming and batch DataFrames/Datasets is not supported") case Except(left, right) if right.isStreaming => - throwError("Except with a streaming DataFrame/Dataset on the right is not supported") + throwError("Except on a streaming DataFrame/Dataset on the right is not supported") case Intersect(left, right) if left.isStreaming && right.isStreaming => throwError("Intersect between two streaming DataFrames/Datasets is not supported") @@ -156,7 +167,7 @@ object UnsupportedOperationChecker { case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + - "aggregated DataFrame/Dataset in Complete mode") + "aggregated DataFrame/Dataset in Complete output mode") case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index ff1bb126f463d81385da064d9a3a9e652a05e0a6..34e94c71422d766f0a7b8d7dd99b45fd3347b631 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -98,6 +98,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Update, expectedMsgs = Seq("multiple streaming aggregations")) + // Aggregation: Distinct aggregates not supported on streaming relation + val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c")) + assertSupportedInStreamingPlan( + "distinct aggregate - aggregate on batch relation", + Aggregate(Nil, distinctAggExprs, batchRelation), + outputMode = Append) + + assertNotSupportedInStreamingPlan( + "distinct aggregate - aggregate on streaming relation", + Aggregate(Nil, distinctAggExprs, streamRelation), + outputMode = Complete, + expectedMsgs = Seq("distinct aggregation")) + // Inner joins: Stream-stream not supported testBinaryOperationInStreamingPlan( "inner join",