From 52ef76de219c4bf19c54c99414b89a67d0bf457b Mon Sep 17 00:00:00 2001
From: Wenchen Fan <cloud0fan@outlook.com>
Date: Thu, 23 Jul 2015 09:37:53 -0700
Subject: [PATCH] [SPARK-9082] [SQL] [FOLLOW-UP] use `partition` in
 `PushPredicateThroughProject`

a follow up of https://github.com/apache/spark/pull/7446

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #7607 from cloud-fan/tmp and squashes the following commits:

7106989 [Wenchen Fan] use `partition` in `PushPredicateThroughProject`
---
 .../sql/catalyst/optimizer/Optimizer.scala    | 22 +++++++------------
 1 file changed, 8 insertions(+), 14 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d2db3dd3d0..b59f800e7c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -553,33 +553,27 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe
       // Split the condition into small conditions by `And`, so that we can push down part of this
       // condition without nondeterministic expressions.
       val andConditions = splitConjunctivePredicates(condition)
-      val nondeterministicConditions = andConditions.filter(hasNondeterministic(_, aliasMap))
+
+      val (deterministic, nondeterministic) = andConditions.partition(_.collect {
+        case a: Attribute if aliasMap.contains(a) => aliasMap(a)
+      }.forall(_.deterministic))
 
       // If there is no nondeterministic conditions, push down the whole condition.
-      if (nondeterministicConditions.isEmpty) {
+      if (nondeterministic.isEmpty) {
         project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
       } else {
         // If they are all nondeterministic conditions, leave it un-changed.
-        if (nondeterministicConditions.length == andConditions.length) {
+        if (deterministic.isEmpty) {
           filter
         } else {
-          val deterministicConditions = andConditions.filterNot(hasNondeterministic(_, aliasMap))
           // Push down the small conditions without nondeterministic expressions.
-          val pushedCondition = deterministicConditions.map(replaceAlias(_, aliasMap)).reduce(And)
-          Filter(nondeterministicConditions.reduce(And),
+          val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And)
+          Filter(nondeterministic.reduce(And),
             project.copy(child = Filter(pushedCondition, grandChild)))
         }
       }
   }
 
-  private def hasNondeterministic(
-      condition: Expression,
-      sourceAliases: AttributeMap[Expression]) = {
-    condition.collect {
-      case a: Attribute if sourceAliases.contains(a) => sourceAliases(a)
-    }.exists(!_.deterministic)
-  }
-
   // Substitute any attributes that are produced by the child projection, so that we safely
   // eliminate it.
   private def replaceAlias(condition: Expression, sourceAliases: AttributeMap[Expression]) = {
-- 
GitLab