Skip to content
Snippets Groups Projects
Commit 570e5e11 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-19268][SS] Disallow adaptive query execution for streaming queries


## What changes were proposed in this pull request?

As adaptive query execution may change the number of partitions in different batches, it may break streaming queries. Hence, we should disallow this feature in Structured Streaming.

## How was this patch tested?

`test("SPARK-19268: Adaptive query execution should be disallowed")`.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16683 from zsxwing/SPARK-19268.

(cherry picked from commit 60bd91a3)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent 4a2be090
No related branches found
No related tags found
No related merge requests found
......@@ -230,6 +230,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
throw new AnalysisException(
s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets")
}
new StreamingQueryWrapper(new StreamExecution(
sparkSession,
userSpecifiedName.orNull,
......
......@@ -30,8 +30,9 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils
......@@ -238,6 +239,15 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
test("SPARK-19268: Adaptive query execution should be disallowed") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val e = intercept[AnalysisException] {
MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
}
assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
e.getMessage.contains("not supported"))
}
}
/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment