Skip to content
Snippets Groups Projects
Commit 1ffa608b authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-15428][SQL] Disable multiple streaming aggregations

## What changes were proposed in this pull request?

Incrementalizing plans of with multiple streaming aggregation is tricky and we dont have the necessary support for "delta" to implement correctly. So disabling the support for multiple streaming aggregations.

## How was this patch tested?
Additional unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13210 from tdas/SPARK-15428.
parent 845e447f
No related branches found
No related tags found
No related merge requests found
...@@ -55,10 +55,20 @@ object UnsupportedOperationChecker { ...@@ -55,10 +55,20 @@ object UnsupportedOperationChecker {
case _: InsertIntoTable => case _: InsertIntoTable =>
throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")
case Aggregate(_, _, child) if child.isStreaming && outputMode == Append => case Aggregate(_, _, child) if child.isStreaming =>
throwError( if (outputMode == Append) {
"Aggregations are not supported on streaming DataFrames/Datasets in " + throwError(
"Append output mode. Consider changing output mode to Update.") "Aggregations are not supported on streaming DataFrames/Datasets in " +
"Append output mode. Consider changing output mode to Update.")
}
val moreStreamingAggregates = child.find {
case Aggregate(_, _, grandchild) if grandchild.isStreaming => true
case _ => false
}
if (moreStreamingAggregates.nonEmpty) {
throwError("Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")
}
case Join(left, right, joinType, _) => case Join(left, right, joinType, _) =>
......
...@@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier ...@@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.IntegerType
...@@ -95,6 +96,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite { ...@@ -95,6 +96,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append, outputMode = Append,
Seq("aggregation", "append output mode")) Seq("aggregation", "append output mode"))
// Multiple streaming aggregations not supported
def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
assertSupportedInStreamingPlan(
"aggregate - multiple batch aggregations",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)),
Update)
assertSupportedInStreamingPlan(
"aggregate - multiple aggregations but only one streaming aggregation",
Aggregate(Nil, aggExprs("c"), batchRelation).join(
Aggregate(Nil, aggExprs("d"), streamRelation), joinType = Inner),
Update)
assertNotSupportedInStreamingPlan(
"aggregate - multiple streaming aggregations",
Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), streamRelation)),
outputMode = Update,
expectedMsgs = Seq("multiple streaming aggregations"))
// Inner joins: Stream-stream not supported // Inner joins: Stream-stream not supported
testBinaryOperationInStreamingPlan( testBinaryOperationInStreamingPlan(
"inner join", "inner join",
...@@ -354,17 +375,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite { ...@@ -354,17 +375,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
val e = intercept[AnalysisException] { val e = intercept[AnalysisException] {
testBody testBody
} }
expectedMsgs.foreach { m =>
if (!expectedMsgs.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains)) { if (!e.getMessage.toLowerCase.contains(m.toLowerCase)) {
fail( fail(s"Exception message should contain: '$m', " +
s"""Exception message should contain the following substrings: s"actual exception message:\n\t'${e.getMessage}'")
| }
| ${expectedMsgs.mkString("\n ")}
|
|Actual exception message:
|
| ${e.getMessage}
""".stripMargin)
} }
} }
} }
......
...@@ -84,25 +84,6 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be ...@@ -84,25 +84,6 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be
) )
} }
test("multiple aggregations") {
val inputData = MemoryStream[Int]
val aggregated =
inputData.toDF()
.groupBy($"value")
.agg(count("*") as 'count)
.groupBy($"value" % 2)
.agg(sum($"count"))
.as[(Int, Long)]
testStream(aggregated)(
AddData(inputData, 1, 2, 3, 4),
CheckLastBatch((0, 2), (1, 2)),
AddData(inputData, 1, 3, 5),
CheckLastBatch((1, 5))
)
}
testQuietly("midbatch failure") { testQuietly("midbatch failure") {
val inputData = MemoryStream[Int] val inputData = MemoryStream[Int]
FailureSinglton.firstTime = true FailureSinglton.firstTime = true
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment