Skip to content
Snippets Groups Projects
Commit abdb5d42 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-15812][SQ][STREAMING] Added support for sorting after streaming...

[SPARK-15812][SQ][STREAMING] Added support for sorting after streaming aggregation with complete mode

## What changes were proposed in this pull request?

When the output mode is complete, then the output of a streaming aggregation essentially will contain the complete aggregates every time. So this is not different from a batch dataset within an incremental execution. Other non-streaming operations should be supported on this dataset. In this PR, I am just adding support for sorting, as it is a common useful functionality. Support for other operations will come later.

## How was this patch tested?
Additional unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #13549 from tdas/SPARK-15812.
parent cdd7f5a5
No related branches found
No related tags found
No related merge requests found
......@@ -43,6 +43,41 @@ object UnsupportedOperationChecker {
"Queries without streaming sources cannot be executed with write.startStream()")(plan)
}
// Disallow multiple streaming aggregations
val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
if (aggregates.size > 1) {
throwError(
"Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")(plan)
}
// Disallow some output mode
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)
case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
throwError(
s"$outputMode output mode not supported when there are no streaming aggregations on " +
s"streaming DataFrames/Datasets")(plan)
case _ =>
}
/**
* Whether the subplan will contain complete data or incremental data in every incremental
* execution. Some operations may be allowed only when the child logical plan gives complete
* data.
*/
def containsCompleteData(subplan: LogicalPlan): Boolean = {
val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
// Either the subplan has no streaming source, or it has aggregation with Complete mode
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
}
plan.foreachUp { implicit subPlan =>
// Operations that cannot exists anywhere in a streaming plan
......@@ -107,8 +142,9 @@ object UnsupportedOperationChecker {
case GlobalLimit(_, _) | LocalLimit(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Limits are not supported on streaming DataFrames/Datasets")
case Sort(_, _, _) | SortPartitions(_, _) if subPlan.children.forall(_.isStreaming) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets")
case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
"aggregated DataFrame/Dataset in Complete mode")
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets")
......@@ -123,27 +159,6 @@ object UnsupportedOperationChecker {
case _ =>
}
}
// Checks related to aggregations
val aggregates = plan.collect { case a @ Aggregate(_, _, _) if a.isStreaming => a }
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
throwError(
s"$outputMode output mode not supported when there are streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)
case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty =>
throwError(
s"$outputMode output mode not supported when there are no streaming aggregations on " +
s"streaming DataFrames/Datasets")(plan)
case _ =>
}
if (aggregates.size > 1) {
throwError(
"Multiple streaming aggregations are not supported with " +
"streaming DataFrames/Datasets")(plan)
}
}
private def throwErrorIf(
......
......@@ -81,7 +81,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode = Append,
expectedMsgs = "commands" :: Nil)
// Multiple streaming aggregations not supported
// Aggregation: Multiple streaming aggregations not supported
def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
assertSupportedInStreamingPlan(
......@@ -189,8 +189,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)
// Unary operations
// Sort: supported only on batch subplans and on aggregation + complete output mode
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
assertSupportedInStreamingPlan(
"sort - sort over aggregated data in Complete output mode",
streamRelation.groupBy()(Count("*")).sortBy(),
Complete)
assertNotSupportedInStreamingPlan(
"sort - sort over aggregated data in Update output mode",
streamRelation.groupBy()(Count("*")).sortBy(),
Update,
Seq("sort", "aggregat", "complete")) // sort on aggregations is supported on Complete mode only
// Other unary operations
testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), expectedMsg = "sort")
testUnaryOperatorInStreamingPlan(
"sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
......@@ -299,6 +311,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
outputMode)
}
/** Test output mode with and without aggregation in the streaming plan */
def testOutputMode(
outputMode: OutputMode,
shouldSupportAggregation: Boolean): Unit = {
......
......@@ -111,10 +111,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def apply[A : Encoder](data: A*): CheckAnswerRows = {
val encoder = encoderFor[A]
val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), false)
CheckAnswerRows(
data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
lastOnly = false,
isSorted = false)
}
def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false)
def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, false)
}
/**
......@@ -123,15 +126,22 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
*/
object CheckLastBatch {
def apply[A : Encoder](data: A*): CheckAnswerRows = {
apply(isSorted = false, data: _*)
}
def apply[A: Encoder](isSorted: Boolean, data: A*): CheckAnswerRows = {
val encoder = encoderFor[A]
val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), true)
CheckAnswerRows(
data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
lastOnly = true,
isSorted = isSorted)
}
def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true)
def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true, false)
}
case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean)
case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean, isSorted: Boolean)
extends StreamAction with StreamMustBeRunning {
override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
......@@ -414,7 +424,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
failTest("Error adding data", e)
}
case CheckAnswerRows(expectedAnswer, lastOnly) =>
case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
verify(currentStream != null, "stream not running")
// Get the map of source index to the current source objects
val indexToSource = currentStream
......@@ -436,7 +446,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
failTest("Exception while getting data from sink", e)
}
QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach {
error => failTest(error)
}
}
......
......@@ -104,6 +104,31 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll {
}
}
test("sort after aggregate in complete mode") {
val inputData = MemoryStream[Int]
val aggregated =
inputData.toDF()
.groupBy($"value")
.agg(count("*"))
.toDF("value", "count")
.orderBy($"count".desc)
.as[(Int, Long)]
testStream(aggregated, Complete)(
AddData(inputData, 3),
CheckLastBatch(isSorted = true, (3, 1)),
AddData(inputData, 2, 3),
CheckLastBatch(isSorted = true, (3, 2), (2, 1)),
StopStream,
StartStream(),
AddData(inputData, 3, 2, 1),
CheckLastBatch(isSorted = true, (3, 3), (2, 2), (1, 1)),
AddData(inputData, 4, 4, 4, 4),
CheckLastBatch(isSorted = true, (4, 4), (3, 3), (2, 2), (1, 1))
)
}
test("multiple keys") {
val inputData = MemoryStream[Int]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment