From e526913989d6099064886ea3ed3f6a2a0376a4f8 Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@databricks.com>
Date: Fri, 3 Jun 2016 13:53:02 -0700
Subject: [PATCH] [SPARK-15742][SQL] Reduce temp collections allocations in
 TreeNode transform methods

In Catalyst's TreeNode transform methods we end up calling `productIterator.map(...).toArray` in a number of places, which is slightly inefficient because it needs to allocate an `ArrayBuilder` and grow a temporary array. Since we already know the size of the final output (`productArity`), we can simply allocate an array up-front and use a while loop to consume the iterator and populate the array.

For most workloads, this performance difference is negligible but it does make a measurable difference in optimizer performance for queries that operate over very wide schemas (such as the benchmark queries in #13456).

### Perf results (from #13456 benchmarks)

**Before**

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

parsing large select:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
1 select expressions                            19 /   22          0.0    19119858.0       1.0X
10 select expressions                           23 /   25          0.0    23208774.0       0.8X
100 select expressions                          55 /   73          0.0    54768402.0       0.3X
1000 select expressions                        229 /  259          0.0   228606373.0       0.1X
2500 select expressions                        530 /  554          0.0   529938178.0       0.0X
```

**After**

```
parsing large select:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
1 select expressions                            15 /   21          0.0    14978203.0       1.0X
10 select expressions                           22 /   27          0.0    22492262.0       0.7X
100 select expressions                          48 /   64          0.0    48449834.0       0.3X
1000 select expressions                        189 /  208          0.0   189346428.0       0.1X
2500 select expressions                        429 /  449          0.0   428943897.0       0.0X
```

###

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13484 from JoshRosen/treenode-productiterator-map.
---
 .../spark/sql/catalyst/plans/QueryPlan.scala  |  6 ++---
 .../spark/sql/catalyst/trees/TreeNode.scala   | 26 ++++++++++++++-----
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 6784c3ae1d..3de15a9a3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -172,7 +172,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
       case null => null
     }
 
-    val newArgs = productIterator.map(recursiveTransform).toArray
+    val newArgs = mapProductIterator(recursiveTransform)
 
     if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
   }
@@ -206,7 +206,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
       case null => null
     }
 
-    val newArgs = productIterator.map(recursiveTransform).toArray
+    val newArgs = mapProductIterator(recursiveTransform)
 
     if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
   }
@@ -318,7 +318,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
       case other => other
     }
 
-    productIterator.map {
+    mapProductIterator {
       case s: Option[_] => s.map(cleanArg)
       case s: Seq[_] => s.map(cleanArg)
       case m: Map[_, _] => m.mapValues(cleanArg)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 50481cd298..22d82c6108 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 
 import scala.collection.Map
 import scala.collection.mutable.Stack
+import scala.reflect.ClassTag
 
 import org.apache.commons.lang.ClassUtils
 import org.json4s.JsonAST._
@@ -168,12 +169,25 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     }
   }
 
+  /**
+   * Efficient alternative to `productIterator.map(f).toArray`.
+   */
+  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
+    val arr = Array.ofDim[B](productArity)
+    var i = 0
+    while (i < arr.length) {
+      arr(i) = f(productElement(i))
+      i += 1
+    }
+    arr
+  }
+
   /**
    * Returns a copy of this node where `f` has been applied to all the nodes children.
    */
   def mapChildren(f: BaseType => BaseType): BaseType = {
     var changed = false
-    val newArgs = productIterator.map {
+    val newArgs = mapProductIterator {
       case arg: TreeNode[_] if containsChild(arg) =>
         val newChild = f(arg.asInstanceOf[BaseType])
         if (newChild fastEquals arg) {
@@ -184,7 +198,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
         }
       case nonChild: AnyRef => nonChild
       case null => null
-    }.toArray
+    }
     if (changed) makeCopy(newArgs) else this
   }
 
@@ -197,7 +211,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     var changed = false
     val remainingNewChildren = newChildren.toBuffer
     val remainingOldChildren = children.toBuffer
-    val newArgs = productIterator.map {
+    val newArgs = mapProductIterator {
       case s: StructType => s // Don't convert struct types to some other type of Seq[StructField]
       // Handle Seq[TreeNode] in TreeNode parameters.
       case s: Seq[_] => s.map {
@@ -237,7 +251,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
         }
       case nonChild: AnyRef => nonChild
       case null => null
-    }.toArray
+    }
 
     if (changed) makeCopy(newArgs) else this
   }
@@ -302,7 +316,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
       rule: PartialFunction[BaseType, BaseType],
       nextOperation: (BaseType, PartialFunction[BaseType, BaseType]) => BaseType): BaseType = {
     var changed = false
-    val newArgs = productIterator.map {
+    val newArgs = mapProductIterator {
       case arg: TreeNode[_] if containsChild(arg) =>
         val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
         if (!(newChild fastEquals arg)) {
@@ -353,7 +367,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
       }
       case nonChild: AnyRef => nonChild
       case null => null
-    }.toArray
+    }
     if (changed) makeCopy(newArgs) else this
   }
 
-- 
GitLab