Skip to content
Snippets Groups Projects
Commit f7cf2096 authored by Xiao Li's avatar Xiao Li
Browse files

[SPARK-20941][SQL] Fix SubqueryExec Reuse

### What changes were proposed in this pull request?
Before this PR, Subquery reuse does not work. Below are three issues:
- Subquery reuse does not work.
- It is sharing the same `SQLConf` (`spark.sql.exchange.reuse`) with the one for Exchange Reuse.
- No test case covers the rule Subquery reuse.

This PR is to fix the above three issues.
- Ignored the physical operator `SubqueryExec` when comparing two plans.
- Added a dedicated conf `spark.sql.subqueries.reuse` for controlling Subquery Reuse
- Added a test case for verifying the behavior

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18169 from gatorsmile/subqueryReuse.
parent 0975019c
No related branches found
No related tags found
No related merge requests found
......@@ -552,6 +552,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
.internal()
.doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
.booleanConf
.createWithDefault(true)
val STATE_STORE_PROVIDER_CLASS =
buildConf("spark.sql.streaming.stateStore.providerClass")
.internal()
......@@ -932,6 +938,8 @@ class SQLConf extends Serializable with Logging {
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)
......
......@@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
*/
case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
// Ignore this wrapper for canonicalizing.
override lazy val canonicalized: SparkPlan = child.canonicalized
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))
......
......@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
if (!conf.exchangeReuseEnabled) {
if (!conf.subqueryReuseEnabled) {
return plan
}
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.
......
......@@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL}
import java.sql.Timestamp
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
......@@ -700,6 +703,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
}
test("Verify spark.sql.subquery.reuse") {
Seq(true, false).foreach { reuse =>
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
val df = sql(
"""
|SELECT key, (SELECT avg(key) FROM testData)
|FROM testData
|WHERE key > (SELECT avg(key) FROM testData)
|ORDER BY key
|LIMIT 3
""".stripMargin)
checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
val subqueries = ArrayBuffer.empty[SubqueryExec]
df.queryExecution.executedPlan.transformAllExpressions {
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
subqueries += plan
s
}
assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
if (reuse) {
assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
} else {
assert(subqueries.distinct.size == 2, "Reuse is not expected")
}
}
}
}
test("cartesian product join") {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
checkAnswer(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment