Skip to content
Snippets Groups Projects
Commit a04cab8f authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Wenchen Fan
Browse files

[SPARK-16174][SQL] Improve `OptimizeIn` optimizer to remove literal repetitions

## What changes were proposed in this pull request?

This PR improves `OptimizeIn` optimizer to remove the literal repetitions from SQL `IN` predicates. This optimizer prevents user mistakes and also can optimize some queries like [TPCDS-36](https://github.com/apache/spark/blob/master/sql/core/src/test/resources/tpcds/q36.sql#L19).

**Before**
```scala
scala> sql("select state from (select explode(array('CA','TN')) state) where state in ('TN','TN','TN','TN','TN','TN','TN')").explain
== Physical Plan ==
*Filter state#6 IN (TN,TN,TN,TN,TN,TN,TN)
+- Generate explode([CA,TN]), false, false, [state#6]
   +- Scan OneRowRelation[]
```

**After**
```scala
scala> sql("select state from (select explode(array('CA','TN')) state) where state in ('TN','TN','TN','TN','TN','TN','TN')").explain
== Physical Plan ==
*Filter state#6 IN (TN)
+- Generate explode([CA,TN]), false, false, [state#6]
   +- Scan OneRowRelation[]
```

## How was this patch tested?

Pass the Jenkins tests (including a new testcase).

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13876 from dongjoon-hyun/SPARK-16174.
parent 6343f665
No related branches found
No related tags found
No related merge requests found
...@@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate ...@@ -132,6 +132,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate
} }
override def children: Seq[Expression] = value +: list override def children: Seq[Expression] = value +: list
lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal])
override def nullable: Boolean = children.exists(_.nullable) override def nullable: Boolean = children.exists(_.nullable)
override def foldable: Boolean = children.forall(_.foldable) override def foldable: Boolean = children.forall(_.foldable)
......
...@@ -820,16 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] { ...@@ -820,16 +820,24 @@ object ConstantFolding extends Rule[LogicalPlan] {
} }
/** /**
* Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]] * Optimize IN predicates:
* which is much faster * 1. Removes literal repetitions.
* 2. Replaces [[In (value, seq[Literal])]] with optimized version
* [[InSet (value, HashSet[Literal])]] which is much faster.
*/ */
case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform { def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown { case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && case expr @ In(v, list) if expr.inSetConvertible =>
list.size > conf.optimizerInSetConversionThreshold => val newList = ExpressionSet(list).toSeq
val hSet = list.map(e => e.eval(EmptyRow)) if (newList.size > conf.optimizerInSetConversionThreshold) {
InSet(v, HashSet() ++ hSet) val hSet = newList.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
} else if (newList.size < list.size) {
expr.copy(list = newList)
} else { // newList.length == list.length
expr
}
} }
} }
} }
......
...@@ -42,6 +42,30 @@ class OptimizeInSuite extends PlanTest { ...@@ -42,6 +42,30 @@ class OptimizeInSuite extends PlanTest {
val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
test("OptimizedIn test: Remove deterministic repetitions") {
val originalQuery =
testRelation
.where(In(UnresolvedAttribute("a"),
Seq(Literal(1), Literal(1), Literal(2), Literal(2), Literal(1), Literal(2))))
.where(In(UnresolvedAttribute("b"),
Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"),
Round(UnresolvedAttribute("a"), 0), Round(UnresolvedAttribute("a"), 0),
Rand(0), Rand(0))))
.analyze
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2))))
.where(In(UnresolvedAttribute("b"),
Seq(UnresolvedAttribute("a"), UnresolvedAttribute("a"),
Round(UnresolvedAttribute("a"), 0), Round(UnresolvedAttribute("a"), 0),
Rand(0), Rand(0))))
.analyze
comparePlans(optimized, correctAnswer)
}
test("OptimizedIn test: In clause not optimized to InSet when less than 10 items") { test("OptimizedIn test: In clause not optimized to InSet when less than 10 items") {
val originalQuery = val originalQuery =
testRelation testRelation
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment