From ba68a51c407197d478b330403af8fe24a176bef3 Mon Sep 17 00:00:00 2001
From: Daoyuan Wang <daoyuan.wang@intel.com>
Date: Fri, 19 Sep 2014 15:39:31 -0700
Subject: [PATCH] [SPARK-3485][SQL] Use GenericUDFUtils.ConversionHelper for
 Simple UDF type conversions

This is just another solution to SPARK-3485, in addition to PR #2355
In this patch, we will use ConventionHelper and FunctionRegistry to invoke a simple udf evaluation, which rely more on hive, but much cleaner and safer.
We can discuss which one is better.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #2407 from adrian-wang/simpleudf and squashes the following commits:

15762d2 [Daoyuan Wang] add posmod test which would fail the test but now ok
0d69eb4 [Daoyuan Wang] another way to pass to hive simple udf
---
 .../execution/HiveCompatibilitySuite.scala    |  1 +
 .../org/apache/spark/sql/hive/hiveUdfs.scala  | 55 ++++++-------------
 2 files changed, 17 insertions(+), 39 deletions(-)

diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index ab487d673e..556c984ad3 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -801,6 +801,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "udf_or",
     "udf_parse_url",
     "udf_PI",
+    "udf_pmod",
     "udf_positive",
     "udf_pow",
     "udf_power",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 7cda0dd302..5a0e6c5cc1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
@@ -105,52 +107,27 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[
     function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo))
 
   @transient
-  lazy val dataType = javaClassToDataType(method.getReturnType)
+  protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray
 
-  protected lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass =>
-    val primitiveClasses = Seq(
-      Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE,
-      classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long],
-      classOf[HiveDecimal], java.lang.Byte.TYPE, classOf[java.lang.Byte],
-      classOf[java.sql.Timestamp]
-    )
-    val matchingConstructor = argClass.getConstructors.find { c =>
-      c.getParameterTypes.size == 1 && primitiveClasses.contains(c.getParameterTypes.head)
-    }
+  // Create parameter converters
+  @transient
+  protected lazy val conversionHelper = new ConversionHelper(method, arguments)
 
-    matchingConstructor match {
-      case Some(constructor) =>
-        (a: Any) => {
-          logDebug(
-            s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} $constructor.")
-          // We must make sure that primitives get boxed java style.
-          if (a == null) {
-            null
-          } else {
-            constructor.newInstance(a match {
-              case i: Int => i: java.lang.Integer
-              case bd: BigDecimal => new HiveDecimal(bd.underlying())
-              case other: AnyRef => other
-            }).asInstanceOf[AnyRef]
-          }
-        }
-      case None =>
-        (a: Any) => a match {
-          case wrapper => wrap(wrapper)
-        }
-    }
+  @transient
+  lazy val dataType = javaClassToDataType(method.getReturnType)
+
+  def catalystToHive(value: Any): Object = value match {
+    // TODO need more types here? or can we use wrap()
+    case bd: BigDecimal => new HiveDecimal(bd.underlying())
+    case d => d.asInstanceOf[Object]
   }
 
   // TODO: Finish input output types.
   override def eval(input: Row): Any = {
-    val evaluatedChildren = children.map(_.eval(input))
-    // Wrap the function arguments in the expected types.
-    val args = evaluatedChildren.zip(wrappers).map {
-      case (arg, wrapper) => wrapper(arg)
-    }
+    val evaluatedChildren = children.map(c => catalystToHive(c.eval(input)))
 
-    // Invoke the udf and unwrap the result.
-    unwrap(method.invoke(function, args: _*))
+    unwrap(FunctionRegistry.invoke(method, function, conversionHelper
+      .convertIfNecessary(evaluatedChildren: _*): _*))
   }
 }
 
-- 
GitLab