From 67d06880e47e0324409cf7e5b21db1dcb0107b82 Mon Sep 17 00:00:00 2001
From: Volodymyr Lyubinets <vlyubin@gmail.com>
Date: Fri, 10 Apr 2015 16:27:56 -0700
Subject: [PATCH] [SQL] [SPARK-6620] Speed up toDF() and rdd() functions by
 constructing converters in ScalaReflection

cc marmbrus

Author: Volodymyr Lyubinets <vlyubin@gmail.com>

Closes #5279 from vlyubin/speedup and squashes the following commits:

e75a387 [Volodymyr Lyubinets] Changes to ScalaUDF
11a20ec [Volodymyr Lyubinets] Avoid creating a tuple
c327bc9 [Volodymyr Lyubinets] Moved the only remaining function from DataTypeConversions to DateUtils
dec6802 [Volodymyr Lyubinets] Addresed review feedback
74301fa [Volodymyr Lyubinets] Addressed review comments
afa3aa5 [Volodymyr Lyubinets] Minor refactoring, added license, removed debug output
881dc60 [Volodymyr Lyubinets] Moved to a separate module; addressed review comments; one extra place of usage; changed behaviour for Java
8cad6e2 [Volodymyr Lyubinets] Addressed review commments
41b2aa9 [Volodymyr Lyubinets] Creating converters for ScalaReflection stuff, and more
---
 .../spark/ml/feature/TokenizerSuite.scala     |  17 +-
 .../sql/catalyst/CatalystTypeConverters.scala | 295 +++++++
 .../spark/sql/catalyst/ScalaReflection.scala  |  55 --
 .../sql/catalyst/expressions/ScalaUdf.scala   | 819 ++++++++++++------
 .../plans/logical/LocalRelation.scala         |   7 +-
 .../spark/sql/types/DataTypeConversions.scala |  77 --
 .../apache/spark/sql/types/DateUtils.scala    |  29 +
 .../sql/catalyst/ScalaReflectionSuite.scala   |   4 +-
 .../org/apache/spark/sql/DataFrame.scala      |  11 +-
 .../org/apache/spark/sql/SQLContext.scala     |   9 +-
 .../spark/sql/execution/ExistingRDD.scala     |  14 +-
 .../spark/sql/execution/LocalTableScan.scala  |  16 +-
 .../spark/sql/execution/SparkPlan.scala       |  11 +-
 .../spark/sql/execution/basicOperators.scala  |   9 +-
 .../org/apache/spark/sql/json/JsonRDD.scala   |   4 +-
 .../apache/spark/sql/JavaDataFrameSuite.java  |  10 +-
 .../org/apache/spark/sql/json/JsonSuite.scala |   3 +-
 17 files changed, 929 insertions(+), 461 deletions(-)
 create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala

diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
index bf862b912d..d186ead8f5 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
@@ -25,10 +25,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 @BeanInfo
-case class TokenizerTestData(rawText: String, wantedTokens: Seq[String]) {
-  /** Constructor used in [[org.apache.spark.ml.feature.JavaTokenizerSuite]] */
-  def this(rawText: String, wantedTokens: Array[String]) = this(rawText, wantedTokens.toSeq)
-}
+case class TokenizerTestData(rawText: String, wantedTokens: Array[String])
 
 class RegexTokenizerSuite extends FunSuite with MLlibTestSparkContext {
   import org.apache.spark.ml.feature.RegexTokenizerSuite._
@@ -46,14 +43,14 @@ class RegexTokenizerSuite extends FunSuite with MLlibTestSparkContext {
       .setOutputCol("tokens")
 
     val dataset0 = sqlContext.createDataFrame(Seq(
-      TokenizerTestData("Test for tokenization.", Seq("Test", "for", "tokenization", ".")),
-      TokenizerTestData("Te,st. punct", Seq("Te", ",", "st", ".", "punct"))
+      TokenizerTestData("Test for tokenization.", Array("Test", "for", "tokenization", ".")),
+      TokenizerTestData("Te,st. punct", Array("Te", ",", "st", ".", "punct"))
     ))
     testRegexTokenizer(tokenizer, dataset0)
 
     val dataset1 = sqlContext.createDataFrame(Seq(
-      TokenizerTestData("Test for tokenization.", Seq("Test", "for", "tokenization")),
-      TokenizerTestData("Te,st. punct", Seq("punct"))
+      TokenizerTestData("Test for tokenization.", Array("Test", "for", "tokenization")),
+      TokenizerTestData("Te,st. punct", Array("punct"))
     ))
 
     tokenizer.setMinTokenLength(3)
@@ -64,8 +61,8 @@ class RegexTokenizerSuite extends FunSuite with MLlibTestSparkContext {
       .setGaps(true)
       .setMinTokenLength(0)
     val dataset2 = sqlContext.createDataFrame(Seq(
-      TokenizerTestData("Test for tokenization.", Seq("Test", "for", "tokenization.")),
-      TokenizerTestData("Te,st.  punct", Seq("Te,st.", "", "punct"))
+      TokenizerTestData("Test for tokenization.", Array("Test", "for", "tokenization.")),
+      TokenizerTestData("Te,st.  punct", Array("Te,st.", "", "punct"))
     ))
     testRegexTokenizer(tokenizer, dataset2)
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
new file mode 100644
index 0000000000..91976fef6d
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import java.util.{Map => JavaMap}
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * Functions to convert Scala types to Catalyst types and vice versa.
+ */
+object CatalystTypeConverters {
+  // The Predef.Map is scala.collection.immutable.Map.
+  // Since the map values can be mutable, we explicitly import scala.collection.Map at here.
+  import scala.collection.Map
+
+  /**
+   * Converts Scala objects to catalyst rows / types. This method is slow, and for batch
+   * conversion you should be using converter produced by createToCatalystConverter.
+   * Note: This is always called after schemaFor has been called.
+   *       This ordering is important for UDT registration.
+   */
+  def convertToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
+    // Check UDT first since UDTs can override other types
+    case (obj, udt: UserDefinedType[_]) =>
+      udt.serialize(obj)
+
+    case (o: Option[_], _) =>
+      o.map(convertToCatalyst(_, dataType)).orNull
+
+    case (s: Seq[_], arrayType: ArrayType) =>
+      s.map(convertToCatalyst(_, arrayType.elementType))
+
+    case (s: Array[_], arrayType: ArrayType) =>
+      s.toSeq.map(convertToCatalyst(_, arrayType.elementType))
+
+    case (m: Map[_, _], mapType: MapType) =>
+      m.map { case (k, v) =>
+        convertToCatalyst(k, mapType.keyType) -> convertToCatalyst(v, mapType.valueType)
+      }
+
+    case (jmap: JavaMap[_, _], mapType: MapType) =>
+      val iter = jmap.entrySet.iterator
+      var listOfEntries: List[(Any, Any)] = List()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        listOfEntries :+= (convertToCatalyst(entry.getKey, mapType.keyType),
+          convertToCatalyst(entry.getValue, mapType.valueType))
+      }
+      listOfEntries.toMap
+
+    case (p: Product, structType: StructType) =>
+      val ar = new Array[Any](structType.size)
+      val iter = p.productIterator
+      var idx = 0
+      while (idx < structType.size) {
+        ar(idx) = convertToCatalyst(iter.next(), structType.fields(idx).dataType)
+        idx += 1
+      }
+      new GenericRowWithSchema(ar, structType)
+
+    case (d: BigDecimal, _) =>
+      Decimal(d)
+
+    case (d: java.math.BigDecimal, _) =>
+      Decimal(d)
+
+    case (d: java.sql.Date, _) =>
+      DateUtils.fromJavaDate(d)
+
+    case (r: Row, structType: StructType) =>
+      val converters = structType.fields.map {
+        f => (item: Any) => convertToCatalyst(item, f.dataType)
+      }
+      convertRowWithConverters(r, structType, converters)
+
+    case (other, _) =>
+      other
+  }
+
+  /**
+   * Creates a converter function that will convert Scala objects to the specified catalyst type.
+   * Typical use case would be converting a collection of rows that have the same schema. You will
+   * call this function once to get a converter, and apply it to every row.
+   */
+  private[sql] def createToCatalystConverter(dataType: DataType): Any => Any = {
+    def extractOption(item: Any): Any = item match {
+      case opt: Option[_] => opt.orNull
+      case other => other
+    }
+
+    dataType match {
+      // Check UDT first since UDTs can override other types
+      case udt: UserDefinedType[_] =>
+        (item) => extractOption(item) match {
+          case null => null
+          case other => udt.serialize(other)
+        }
+
+      case arrayType: ArrayType =>
+        val elementConverter = createToCatalystConverter(arrayType.elementType)
+        (item: Any) => {
+          extractOption(item) match {
+            case a: Array[_] => a.toSeq.map(elementConverter)
+            case s: Seq[_] => s.map(elementConverter)
+            case null => null
+          }
+        }
+
+      case mapType: MapType =>
+        val keyConverter = createToCatalystConverter(mapType.keyType)
+        val valueConverter = createToCatalystConverter(mapType.valueType)
+        (item: Any) => {
+          extractOption(item) match {
+            case m: Map[_, _] =>
+              m.map { case (k, v) =>
+                keyConverter(k) -> valueConverter(v)
+              }
+
+            case jmap: JavaMap[_, _] =>
+              val iter = jmap.entrySet.iterator
+              val convertedMap: HashMap[Any, Any] = HashMap()
+              while (iter.hasNext) {
+                val entry = iter.next()
+                convertedMap(keyConverter(entry.getKey)) = valueConverter(entry.getValue)
+              }
+              convertedMap
+
+            case null => null
+          }
+        }
+
+      case structType: StructType =>
+        val converters = structType.fields.map(f => createToCatalystConverter(f.dataType))
+        (item: Any) => {
+          extractOption(item) match {
+            case r: Row =>
+              convertRowWithConverters(r, structType, converters)
+
+            case p: Product =>
+              val ar = new Array[Any](structType.size)
+              val iter = p.productIterator
+              var idx = 0
+              while (idx < structType.size) {
+                ar(idx) = converters(idx)(iter.next())
+                idx += 1
+              }
+              new GenericRowWithSchema(ar, structType)
+
+            case null =>
+              null
+          }
+        }
+
+      case dateType: DateType => (item: Any) => extractOption(item) match {
+        case d: java.sql.Date => DateUtils.fromJavaDate(d)
+        case other => other
+      }
+
+      case _ =>
+        (item: Any) => extractOption(item) match {
+          case d: BigDecimal => Decimal(d)
+          case d: java.math.BigDecimal => Decimal(d)
+          case other => other
+        }
+    }
+  }
+
+  /** 
+   * Converts Catalyst types used internally in rows to standard Scala types
+   * This method is slow, and for batch conversion you should be using converter
+   * produced by createToScalaConverter.
+   */
+  def convertToScala(a: Any, dataType: DataType): Any = (a, dataType) match {
+    // Check UDT first since UDTs can override other types
+    case (d, udt: UserDefinedType[_]) =>
+      udt.deserialize(d)
+
+    case (s: Seq[_], arrayType: ArrayType) =>
+      s.map(convertToScala(_, arrayType.elementType))
+
+    case (m: Map[_, _], mapType: MapType) =>
+      m.map { case (k, v) =>
+        convertToScala(k, mapType.keyType) -> convertToScala(v, mapType.valueType)
+      }
+
+    case (r: Row, s: StructType) =>
+      convertRowToScala(r, s)
+
+    case (d: Decimal, _: DecimalType) =>
+      d.toJavaBigDecimal
+
+    case (i: Int, DateType) =>
+      DateUtils.toJavaDate(i)
+
+    case (other, _) =>
+      other
+  }
+
+  /**
+   * Creates a converter function that will convert Catalyst types to Scala type.
+   * Typical use case would be converting a collection of rows that have the same schema. You will
+   * call this function once to get a converter, and apply it to every row.
+   */
+  private[sql] def createToScalaConverter(dataType: DataType): Any => Any = dataType match {
+    // Check UDT first since UDTs can override other types
+    case udt: UserDefinedType[_] =>
+      (item: Any) => if (item == null) null else udt.deserialize(item)
+
+    case arrayType: ArrayType =>
+      val elementConverter = createToScalaConverter(arrayType.elementType)
+      (item: Any) => if (item == null) null else item.asInstanceOf[Seq[_]].map(elementConverter)
+
+    case mapType: MapType =>
+      val keyConverter = createToScalaConverter(mapType.keyType)
+      val valueConverter = createToScalaConverter(mapType.valueType)
+      (item: Any) => if (item == null) {
+        null
+      } else {
+        item.asInstanceOf[Map[_, _]].map { case (k, v) =>
+          keyConverter(k) -> valueConverter(v)
+        }
+      }
+
+    case s: StructType =>
+      val converters = s.fields.map(f => createToScalaConverter(f.dataType))
+      (item: Any) => {
+        if (item == null) {
+          null
+        } else {
+          convertRowWithConverters(item.asInstanceOf[Row], s, converters)
+        }
+      }
+
+    case _: DecimalType =>
+      (item: Any) => item match {
+        case d: Decimal => d.toJavaBigDecimal
+        case other => other
+      }
+
+    case DateType =>
+      (item: Any) => item match {
+        case i: Int => DateUtils.toJavaDate(i)
+        case other => other
+      }
+
+    case other =>
+      (item: Any) => item
+  }
+
+  def convertRowToScala(r: Row, schema: StructType): Row = {
+    val ar = new Array[Any](r.size)
+    var idx = 0
+    while (idx < r.size) {
+      ar(idx) = convertToScala(r(idx), schema.fields(idx).dataType)
+      idx += 1
+    }
+    new GenericRowWithSchema(ar, schema)
+  }
+
+  /**
+   * Converts a row by applying the provided set of converter functions. It is used for both
+   * toScala and toCatalyst conversions.
+   */
+  private[sql] def convertRowWithConverters(
+      row: Row,
+      schema: StructType,
+      converters: Array[Any => Any]): Row = {
+    val ar = new Array[Any](row.size)
+    var idx = 0
+    while (idx < row.size) {
+      ar(idx) = converters(idx)(row(idx))
+      idx += 1
+    }
+    new GenericRowWithSchema(ar, schema)
+  }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 8bfd0471d9..01d5c15122 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -46,61 +46,6 @@ trait ScalaReflection {
 
   case class Schema(dataType: DataType, nullable: Boolean)
 
-  /**
-   * Converts Scala objects to catalyst rows / types.
-   * Note: This is always called after schemaFor has been called.
-   *       This ordering is important for UDT registration.
-   */
-  def convertToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
-    // Check UDT first since UDTs can override other types
-    case (obj, udt: UserDefinedType[_]) => udt.serialize(obj)
-    case (o: Option[_], _) => o.map(convertToCatalyst(_, dataType)).orNull
-    case (s: Seq[_], arrayType: ArrayType) => s.map(convertToCatalyst(_, arrayType.elementType))
-    case (s: Array[_], arrayType: ArrayType) => if (arrayType.elementType.isPrimitive) {
-      s.toSeq
-    } else {
-      s.toSeq.map(convertToCatalyst(_, arrayType.elementType))
-    }
-    case (m: Map[_, _], mapType: MapType) => m.map { case (k, v) =>
-      convertToCatalyst(k, mapType.keyType) -> convertToCatalyst(v, mapType.valueType)
-    }
-    case (p: Product, structType: StructType) =>
-      new GenericRow(
-        p.productIterator.toSeq.zip(structType.fields).map { case (elem, field) =>
-          convertToCatalyst(elem, field.dataType)
-        }.toArray)
-    case (d: BigDecimal, _) => Decimal(d)
-    case (d: java.math.BigDecimal, _) => Decimal(d)
-    case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d)
-    case (r: Row, structType: StructType) =>
-      new GenericRow(
-        r.toSeq.zip(structType.fields).map { case (elem, field) =>
-          convertToCatalyst(elem, field.dataType)
-        }.toArray)
-    case (other, _) => other
-  }
-
-  /** Converts Catalyst types used internally in rows to standard Scala types */
-  def convertToScala(a: Any, dataType: DataType): Any = (a, dataType) match {
-    // Check UDT first since UDTs can override other types
-    case (d, udt: UserDefinedType[_]) => udt.deserialize(d)
-    case (s: Seq[_], arrayType: ArrayType) => s.map(convertToScala(_, arrayType.elementType))
-    case (m: Map[_, _], mapType: MapType) => m.map { case (k, v) =>
-      convertToScala(k, mapType.keyType) -> convertToScala(v, mapType.valueType)
-    }
-    case (r: Row, s: StructType) => convertRowToScala(r, s)
-    case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal
-    case (i: Int, DateType) => DateUtils.toJavaDate(i)
-    case (other, _) => other
-  }
-
-  def convertRowToScala(r: Row, schema: StructType): Row = {
-    // TODO: This is very slow!!!
-    new GenericRowWithSchema(
-      r.toSeq.zip(schema.fields.map(_.dataType))
-        .map(r_dt => convertToScala(r_dt._1, r_dt._2)).toArray, schema)
-  }
-
   /** Returns a Sequence of attributes for the given case class type. */
   def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
     case Schema(s: StructType, _) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 389dc4f745..9a77ca624e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.types.DataType
 
 /**
@@ -39,12 +39,14 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
 
     (1 to 22).map { x =>
       val anys = (1 to x).map(x => "Any").reduce(_ + ", " + _)
-      val childs = (0 to x - 1).map(x => s"val child$x = children($x)").reduce(_ + "\n      " + _)
-      val evals = (0 to x - 1).map(x => s"ScalaReflection.convertToScala(child$x.eval(input), child$x.dataType)").reduce(_ + ",\n          " + _)
+      val childs = (0 to x - 1).map(x => s"val child$x = children($x)").reduce(_ + "\n  " + _)
+      lazy val converters = (0 to x - 1).map(x => s"lazy val converter$x = CatalystTypeConverters.createToScalaConverter(child$x.dataType)").reduce(_ + "\n  " + _)
+      val evals = (0 to x - 1).map(x => s"converter$x(child$x.eval(input))").reduce(_ + ",\n      " + _)
 
-      s"""    case $x =>
+      s"""case $x =>
       val func = function.asInstanceOf[($anys) => Any]
       $childs
+      $converters
       (input: Row) => {
         func(
           $evals)
@@ -60,51 +62,61 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       (input: Row) => {
         func()
       }
-      
+
     case 1 =>
       val func = function.asInstanceOf[(Any) => Any]
       val child0 = children(0)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType))
+          converter0(child0.eval(input)))
       }
-      
+
     case 2 =>
       val func = function.asInstanceOf[(Any, Any) => Any]
       val child0 = children(0)
       val child1 = children(1)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)))
       }
-      
+
     case 3 =>
       val func = function.asInstanceOf[(Any, Any, Any) => Any]
       val child0 = children(0)
       val child1 = children(1)
       val child2 = children(2)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)))
       }
-      
+
     case 4 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any) => Any]
       val child0 = children(0)
       val child1 = children(1)
       val child2 = children(2)
       val child3 = children(3)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)))
       }
-      
+
     case 5 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -112,15 +124,20 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child2 = children(2)
       val child3 = children(3)
       val child4 = children(4)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)))
       }
-      
+
     case 6 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -129,16 +146,22 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child3 = children(3)
       val child4 = children(4)
       val child5 = children(5)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)))
       }
-      
+
     case 7 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -148,17 +171,24 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child4 = children(4)
       val child5 = children(5)
       val child6 = children(6)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)))
       }
-      
+
     case 8 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -169,18 +199,26 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child5 = children(5)
       val child6 = children(6)
       val child7 = children(7)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)))
       }
-      
+
     case 9 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -192,19 +230,28 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child6 = children(6)
       val child7 = children(7)
       val child8 = children(8)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)))
       }
-      
+
     case 10 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -217,20 +264,30 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child7 = children(7)
       val child8 = children(8)
       val child9 = children(9)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)))
       }
-      
+
     case 11 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -244,21 +301,32 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child8 = children(8)
       val child9 = children(9)
       val child10 = children(10)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)))
       }
-      
+
     case 12 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -273,22 +341,34 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child9 = children(9)
       val child10 = children(10)
       val child11 = children(11)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)))
       }
-      
+
     case 13 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -304,23 +384,36 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child10 = children(10)
       val child11 = children(11)
       val child12 = children(12)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)))
       }
-      
+
     case 14 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -337,24 +430,38 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child11 = children(11)
       val child12 = children(12)
       val child13 = children(13)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)))
       }
-      
+
     case 15 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -372,25 +479,40 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child12 = children(12)
       val child13 = children(13)
       val child14 = children(14)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)))
       }
-      
+
     case 16 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -409,26 +531,42 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child13 = children(13)
       val child14 = children(14)
       val child15 = children(15)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)))
       }
-      
+
     case 17 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -448,27 +586,44 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child14 = children(14)
       val child15 = children(15)
       val child16 = children(16)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType),
-          ScalaReflection.convertToScala(child16.eval(input), child16.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)),
+          converter16(child16.eval(input)))
       }
-      
+
     case 18 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -489,28 +644,46 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child15 = children(15)
       val child16 = children(16)
       val child17 = children(17)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
+      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType),
-          ScalaReflection.convertToScala(child16.eval(input), child16.dataType),
-          ScalaReflection.convertToScala(child17.eval(input), child17.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)),
+          converter16(child16.eval(input)),
+          converter17(child17.eval(input)))
       }
-      
+
     case 19 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -532,29 +705,48 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child16 = children(16)
       val child17 = children(17)
       val child18 = children(18)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
+      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
+      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType),
-          ScalaReflection.convertToScala(child16.eval(input), child16.dataType),
-          ScalaReflection.convertToScala(child17.eval(input), child17.dataType),
-          ScalaReflection.convertToScala(child18.eval(input), child18.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)),
+          converter16(child16.eval(input)),
+          converter17(child17.eval(input)),
+          converter18(child18.eval(input)))
       }
-      
+
     case 20 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -577,30 +769,50 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child17 = children(17)
       val child18 = children(18)
       val child19 = children(19)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
+      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
+      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
+      lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType),
-          ScalaReflection.convertToScala(child16.eval(input), child16.dataType),
-          ScalaReflection.convertToScala(child17.eval(input), child17.dataType),
-          ScalaReflection.convertToScala(child18.eval(input), child18.dataType),
-          ScalaReflection.convertToScala(child19.eval(input), child19.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)),
+          converter16(child16.eval(input)),
+          converter17(child17.eval(input)),
+          converter18(child18.eval(input)),
+          converter19(child19.eval(input)))
       }
-      
+
     case 21 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -624,31 +836,52 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child18 = children(18)
       val child19 = children(19)
       val child20 = children(20)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
+      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
+      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
+      lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
+      lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType),
-          ScalaReflection.convertToScala(child16.eval(input), child16.dataType),
-          ScalaReflection.convertToScala(child17.eval(input), child17.dataType),
-          ScalaReflection.convertToScala(child18.eval(input), child18.dataType),
-          ScalaReflection.convertToScala(child19.eval(input), child19.dataType),
-          ScalaReflection.convertToScala(child20.eval(input), child20.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)),
+          converter16(child16.eval(input)),
+          converter17(child17.eval(input)),
+          converter18(child18.eval(input)),
+          converter19(child19.eval(input)),
+          converter20(child20.eval(input)))
       }
-      
+
     case 22 =>
       val func = function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any]
       val child0 = children(0)
@@ -673,35 +906,57 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi
       val child19 = children(19)
       val child20 = children(20)
       val child21 = children(21)
+      lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType)
+      lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType)
+      lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType)
+      lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType)
+      lazy val converter4 = CatalystTypeConverters.createToScalaConverter(child4.dataType)
+      lazy val converter5 = CatalystTypeConverters.createToScalaConverter(child5.dataType)
+      lazy val converter6 = CatalystTypeConverters.createToScalaConverter(child6.dataType)
+      lazy val converter7 = CatalystTypeConverters.createToScalaConverter(child7.dataType)
+      lazy val converter8 = CatalystTypeConverters.createToScalaConverter(child8.dataType)
+      lazy val converter9 = CatalystTypeConverters.createToScalaConverter(child9.dataType)
+      lazy val converter10 = CatalystTypeConverters.createToScalaConverter(child10.dataType)
+      lazy val converter11 = CatalystTypeConverters.createToScalaConverter(child11.dataType)
+      lazy val converter12 = CatalystTypeConverters.createToScalaConverter(child12.dataType)
+      lazy val converter13 = CatalystTypeConverters.createToScalaConverter(child13.dataType)
+      lazy val converter14 = CatalystTypeConverters.createToScalaConverter(child14.dataType)
+      lazy val converter15 = CatalystTypeConverters.createToScalaConverter(child15.dataType)
+      lazy val converter16 = CatalystTypeConverters.createToScalaConverter(child16.dataType)
+      lazy val converter17 = CatalystTypeConverters.createToScalaConverter(child17.dataType)
+      lazy val converter18 = CatalystTypeConverters.createToScalaConverter(child18.dataType)
+      lazy val converter19 = CatalystTypeConverters.createToScalaConverter(child19.dataType)
+      lazy val converter20 = CatalystTypeConverters.createToScalaConverter(child20.dataType)
+      lazy val converter21 = CatalystTypeConverters.createToScalaConverter(child21.dataType)
       (input: Row) => {
         func(
-          ScalaReflection.convertToScala(child0.eval(input), child0.dataType),
-          ScalaReflection.convertToScala(child1.eval(input), child1.dataType),
-          ScalaReflection.convertToScala(child2.eval(input), child2.dataType),
-          ScalaReflection.convertToScala(child3.eval(input), child3.dataType),
-          ScalaReflection.convertToScala(child4.eval(input), child4.dataType),
-          ScalaReflection.convertToScala(child5.eval(input), child5.dataType),
-          ScalaReflection.convertToScala(child6.eval(input), child6.dataType),
-          ScalaReflection.convertToScala(child7.eval(input), child7.dataType),
-          ScalaReflection.convertToScala(child8.eval(input), child8.dataType),
-          ScalaReflection.convertToScala(child9.eval(input), child9.dataType),
-          ScalaReflection.convertToScala(child10.eval(input), child10.dataType),
-          ScalaReflection.convertToScala(child11.eval(input), child11.dataType),
-          ScalaReflection.convertToScala(child12.eval(input), child12.dataType),
-          ScalaReflection.convertToScala(child13.eval(input), child13.dataType),
-          ScalaReflection.convertToScala(child14.eval(input), child14.dataType),
-          ScalaReflection.convertToScala(child15.eval(input), child15.dataType),
-          ScalaReflection.convertToScala(child16.eval(input), child16.dataType),
-          ScalaReflection.convertToScala(child17.eval(input), child17.dataType),
-          ScalaReflection.convertToScala(child18.eval(input), child18.dataType),
-          ScalaReflection.convertToScala(child19.eval(input), child19.dataType),
-          ScalaReflection.convertToScala(child20.eval(input), child20.dataType),
-          ScalaReflection.convertToScala(child21.eval(input), child21.dataType))
+          converter0(child0.eval(input)),
+          converter1(child1.eval(input)),
+          converter2(child2.eval(input)),
+          converter3(child3.eval(input)),
+          converter4(child4.eval(input)),
+          converter5(child5.eval(input)),
+          converter6(child6.eval(input)),
+          converter7(child7.eval(input)),
+          converter8(child8.eval(input)),
+          converter9(child9.eval(input)),
+          converter10(child10.eval(input)),
+          converter11(child11.eval(input)),
+          converter12(child12.eval(input)),
+          converter13(child13.eval(input)),
+          converter14(child14.eval(input)),
+          converter15(child15.eval(input)),
+          converter16(child16.eval(input)),
+          converter17(child17.eval(input)),
+          converter18(child18.eval(input)),
+          converter19(child19.eval(input)),
+          converter20(child20.eval(input)),
+          converter21(child21.eval(input)))
       }
   }
-  
+
   // scalastyle:on
-  
-  override def eval(input: Row): Any = ScalaReflection.convertToCatalyst(f(input), dataType)
+
+  override def eval(input: Row): Any = CatalystTypeConverters.convertToCatalyst(f(input), dataType)
 
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index bb79dc3405..e3e070f0ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, analysis}
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types.{DataTypeConversions, StructType, StructField}
+import org.apache.spark.sql.types.{StructType, StructField}
 
 object LocalRelation {
   def apply(output: Attribute*): LocalRelation = new LocalRelation(output)
@@ -31,7 +31,8 @@ object LocalRelation {
 
   def fromProduct(output: Seq[Attribute], data: Seq[Product]): LocalRelation = {
     val schema = StructType.fromAttributes(output)
-    LocalRelation(output, data.map(row => DataTypeConversions.productToRow(row, schema)))
+    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+    LocalRelation(output, data.map(converter(_).asInstanceOf[Row]))
   }
 }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
deleted file mode 100644
index a9d63e7849..0000000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.types
-
-import java.text.SimpleDateFormat
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-
-
-private[sql] object DataTypeConversions {
-
-  def productToRow(product: Product, schema: StructType): Row = {
-    val mutableRow = new GenericMutableRow(product.productArity)
-    val schemaFields = schema.fields.toArray
-
-    var i = 0
-    while (i < mutableRow.length) {
-      mutableRow(i) =
-        ScalaReflection.convertToCatalyst(product.productElement(i), schemaFields(i).dataType)
-      i += 1
-    }
-
-    mutableRow
-  }
-
-  def stringToTime(s: String): java.util.Date = {
-    if (!s.contains('T')) {
-      // JDBC escape string
-      if (s.contains(' ')) {
-        java.sql.Timestamp.valueOf(s)
-      } else {
-        java.sql.Date.valueOf(s)
-      }
-    } else if (s.endsWith("Z")) {
-      // this is zero timezone of ISO8601
-      stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
-    } else if (s.indexOf("GMT") == -1) {
-      // timezone with ISO8601
-      val inset = "+00.00".length
-      val s0 = s.substring(0, s.length - inset)
-      val s1 = s.substring(s.length - inset, s.length)
-      if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
-        stringToTime(s0 + "GMT" + s1)
-      } else {
-        stringToTime(s0 + ".0GMT" + s1)
-      }
-    } else {
-      // ISO8601 with GMT insert
-      val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
-      ISO8601GMT.parse(s)
-    }
-  }
-
-  /** Converts Java objects to catalyst rows / types */
-  def convertJavaToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
-    case (obj, udt: UserDefinedType[_]) => ScalaReflection.convertToCatalyst(obj, udt) // Scala type
-    case (d: java.math.BigDecimal, _) => Decimal(d)
-    case (other, _) => other
-  }
-}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
index 8a1a3b81b3..504fb05842 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.types
 
 import java.sql.Date
+import java.text.SimpleDateFormat
 import java.util.{Calendar, TimeZone}
 
 import org.apache.spark.sql.catalyst.expressions.Cast
@@ -57,4 +58,32 @@ object DateUtils {
   }
 
   def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
+
+  def stringToTime(s: String): java.util.Date = {
+    if (!s.contains('T')) {
+      // JDBC escape string
+      if (s.contains(' ')) {
+        java.sql.Timestamp.valueOf(s)
+      } else {
+        java.sql.Date.valueOf(s)
+      }
+    } else if (s.endsWith("Z")) {
+      // this is zero timezone of ISO8601
+      stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
+    } else if (s.indexOf("GMT") == -1) {
+      // timezone with ISO8601
+      val inset = "+00.00".length
+      val s0 = s.substring(0, s.length - inset)
+      val s1 = s.substring(s.length - inset, s.length)
+      if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
+        stringToTime(s0 + "GMT" + s1)
+      } else {
+        stringToTime(s0 + ".0GMT" + s1)
+      }
+    } else {
+      // ISO8601 with GMT insert
+      val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
+      ISO8601GMT.parse(s)
+    }
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index eee00e3f7e..bbc0b661a0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -260,7 +260,7 @@ class ScalaReflectionSuite extends FunSuite {
     val data = PrimitiveData(1, 1, 1, 1, 1, 1, true)
     val convertedData = Row(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true)
     val dataType = schemaFor[PrimitiveData].dataType
-    assert(convertToCatalyst(data, dataType) === convertedData)
+    assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
   }
 
   test("convert Option[Product] to catalyst") {
@@ -270,7 +270,7 @@ class ScalaReflectionSuite extends FunSuite {
     val dataType = schemaFor[OptionalData].dataType
     val convertedData = Row(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true,
       Row(1, 1, 1, 1, 1, 1, true))
-    assert(convertToCatalyst(data, dataType) === convertedData)
+    assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData)
   }
 
   test("infer schema from case class with multiple constructors") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5c6016a4a2..9b9adf8550 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.sql.catalyst.{ScalaReflection, SqlParser}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedRelation, ResolvedStar}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
@@ -713,7 +713,7 @@ class DataFrame private[sql](
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
     val attributes = schema.toAttributes
     val rowFunction =
-      f.andThen(_.map(ScalaReflection.convertToCatalyst(_, schema).asInstanceOf[Row]))
+      f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema).asInstanceOf[Row]))
     val generator = UserDefinedGenerator(attributes, rowFunction, input.map(_.expr))
 
     Generate(generator, join = true, outer = false, None, logicalPlan)
@@ -734,7 +734,7 @@ class DataFrame private[sql](
     val dataType = ScalaReflection.schemaFor[B].dataType
     val attributes = AttributeReference(outputColumn, dataType)() :: Nil
     def rowFunction(row: Row): TraversableOnce[Row] = {
-      f(row(0).asInstanceOf[A]).map(o => Row(ScalaReflection.convertToCatalyst(o, dataType)))
+      f(row(0).asInstanceOf[A]).map(o => Row(CatalystTypeConverters.convertToCatalyst(o, dataType)))
     }
     val generator = UserDefinedGenerator(attributes, rowFunction, apply(inputColumn).expr :: Nil)
 
@@ -961,7 +961,10 @@ class DataFrame private[sql](
   lazy val rdd: RDD[Row] = {
     // use a local variable to make sure the map closure doesn't capture the whole DataFrame
     val schema = this.schema
-    queryExecution.executedPlan.execute().map(ScalaReflection.convertRowToScala(_, schema))
+    queryExecution.executedPlan.execute().mapPartitions { rows =>
+      val converter = CatalystTypeConverters.createToScalaConverter(schema)
+      rows.map(converter(_).asInstanceOf[Row])
+    }
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 39dd14e796..c25ef58e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,9 +31,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, OneRowRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.catalyst.{ScalaReflection, expressions}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json._
@@ -404,7 +404,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
     // TODO: use MutableProjection when rowRDD is another DataFrame and the applied
     // schema differs from the existing schema on any field data type.
     val catalystRows = if (needsConversion) {
-      rowRDD.map(ScalaReflection.convertToCatalyst(_, schema).asInstanceOf[Row])
+      val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+      rowRDD.map(converter(_).asInstanceOf[Row])
     } else {
       rowRDD
     }
@@ -459,7 +460,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
       iter.map { row =>
         new GenericRow(
           extractors.zip(attributeSeq).map { case (e, attr) =>
-            DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
+            CatalystTypeConverters.convertToCatalyst(e.invoke(row), attr.dataType)
           }.toArray[Any]
         ) : Row
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d8955725e5..656bdd7212 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -20,14 +20,12 @@ package org.apache.spark.sql.execution
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.types.StructType
 
-import scala.collection.immutable
-
 /**
  * :: DeveloperApi ::
  */
@@ -39,13 +37,15 @@ object RDDConversions {
         Iterator.empty
       } else {
         val bufferedIterator = iterator.buffered
-        val mutableRow = new GenericMutableRow(bufferedIterator.head.productArity)
+        val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
         val schemaFields = schema.fields.toArray
+        val converters = schemaFields.map {
+          f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
+        }
         bufferedIterator.map { r =>
           var i = 0
           while (i < mutableRow.length) {
-            mutableRow(i) =
-              ScalaReflection.convertToCatalyst(r.productElement(i), schemaFields(i).dataType)
+            mutableRow(i) = converters(i)(r.productElement(i))
             i += 1
           }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 5bd699a2fa..8a8c3a4043 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.expressions.Attribute
 
 
@@ -32,9 +32,15 @@ case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) extends LeafNo
 
   override def execute(): RDD[Row] = rdd
 
-  override def executeCollect(): Array[Row] =
-    rows.map(ScalaReflection.convertRowToScala(_, schema)).toArray
 
-  override def executeTake(limit: Int): Array[Row] =
-    rows.map(ScalaReflection.convertRowToScala(_, schema)).take(limit).toArray
+  override def executeCollect(): Array[Row] = {
+    val converter = CatalystTypeConverters.createToScalaConverter(schema)
+    rows.map(converter(_).asInstanceOf[Row]).toArray
+  }
+
+
+  override def executeTake(limit: Int): Array[Row] = {
+    val converter = CatalystTypeConverters.createToScalaConverter(schema)
+    rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index d239637cd4..fabcf6b4a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.{ScalaReflection, trees}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -80,8 +80,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   /**
    * Runs this query returning the result as an array.
    */
+
   def executeCollect(): Array[Row] = {
-    execute().map(ScalaReflection.convertRowToScala(_, schema)).collect()
+    execute().mapPartitions { iter =>
+      val converter = CatalystTypeConverters.createToScalaConverter(schema)
+      iter.map(converter(_).asInstanceOf[Row])
+    }.collect()
   }
 
   /**
@@ -125,7 +129,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       partsScanned += numPartsToTry
     }
 
-    buf.toArray.map(ScalaReflection.convertRowToScala(_, this.schema))
+    val converter = CatalystTypeConverters.createToScalaConverter(schema)
+    buf.toArray.map(converter(_).asInstanceOf[Row])
   }
 
   protected def newProjection(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 1f5251a203..6eec520abf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -21,7 +21,7 @@ import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{RDD, ShuffledRDD}
 import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -139,9 +139,10 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)
 
   private def collectData(): Array[Row] = child.execute().map(_.copy()).takeOrdered(limit)(ord)
 
-  // TODO: Is this copying for no reason?
-  override def executeCollect(): Array[Row] =
-    collectData().map(ScalaReflection.convertRowToScala(_, this.schema))
+  override def executeCollect(): Array[Row] = {
+    val converter = CatalystTypeConverters.createToScalaConverter(schema)
+    collectData().map(converter(_).asInstanceOf[Row])
+  }
 
   // TODO: Terminal split should be implemented differently from non-terminal split.
   // TODO: Pick num splits based on |limit|.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 0b770f2251..b1e8521383 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -391,7 +391,7 @@ private[sql] object JsonRDD extends Logging {
     value match {
       // only support string as date
       case value: java.lang.String =>
-        DateUtils.millisToDays(DataTypeConversions.stringToTime(value).getTime)
+        DateUtils.millisToDays(DateUtils.stringToTime(value).getTime)
       case value: java.sql.Date => DateUtils.fromJavaDate(value)
     }
   }
@@ -400,7 +400,7 @@ private[sql] object JsonRDD extends Logging {
     value match {
       case value: java.lang.Integer => new Timestamp(value.asInstanceOf[Int].toLong)
       case value: java.lang.Long => new Timestamp(value)
-      case value: java.lang.String => toTimestamp(DataTypeConversions.stringToTime(value).getTime)
+      case value: java.lang.String => toTimestamp(DateUtils.stringToTime(value).getTime)
     }
   }
 
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index 1ff2d5a190..6d0fbe83c2 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -20,6 +20,8 @@ package test.org.apache.spark.sql;
 import java.io.Serializable;
 import java.util.Arrays;
 
+import scala.collection.Seq;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -127,6 +129,12 @@ public class JavaDataFrameSuite {
       schema.apply("b"));
     Row first = df.select("a", "b").first();
     Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0);
-    Assert.assertArrayEquals(bean.getB(), first.<Integer[]>getAs(1));
+    // Now Java lists and maps are converetd to Scala Seq's and Map's. Once we get a Seq below,
+    // verify that it has the expected length, and contains expected elements.
+    Seq<Integer> result = first.getAs(1);
+    Assert.assertEquals(bean.getB().length, result.length());
+    for (int i = 0; i < result.length(); i++) {
+      Assert.assertEquals(bean.getB()[i], result.apply(i));
+    }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 1fe0b76c00..fd0e2746dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -895,8 +895,7 @@ class JsonSuite extends QueryTest {
     )
   }
 
-  test("SPARK-4228 DataFrame to JSON")
-  {
+  test("SPARK-4228 DataFrame to JSON") {
     val schema1 = StructType(
       StructField("f1", IntegerType, false) ::
       StructField("f2", StringType, false) ::
-- 
GitLab