diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index ab487d673e8138c7f764b5f5d398c4a5311a0829..556c984ad392b9703d91dbf1179f057e1c81c9e1 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -801,6 +801,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     "udf_or",
     "udf_parse_url",
     "udf_PI",
+    "udf_pmod",
     "udf_positive",
     "udf_pow",
     "udf_power",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 7cda0dd302c865342c38e9d00de28d3d92594125..5a0e6c5cc1bbace841f1b0a8e4eea9879813684e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
@@ -105,52 +107,27 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[
     function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo))
 
   @transient
-  lazy val dataType = javaClassToDataType(method.getReturnType)
+  protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray
 
-  protected lazy val wrappers: Array[(Any) => AnyRef] = method.getParameterTypes.map { argClass =>
-    val primitiveClasses = Seq(
-      Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE,
-      classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long],
-      classOf[HiveDecimal], java.lang.Byte.TYPE, classOf[java.lang.Byte],
-      classOf[java.sql.Timestamp]
-    )
-    val matchingConstructor = argClass.getConstructors.find { c =>
-      c.getParameterTypes.size == 1 && primitiveClasses.contains(c.getParameterTypes.head)
-    }
+  // Create parameter converters
+  @transient
+  protected lazy val conversionHelper = new ConversionHelper(method, arguments)
 
-    matchingConstructor match {
-      case Some(constructor) =>
-        (a: Any) => {
-          logDebug(
-            s"Wrapping $a of type ${if (a == null) "null" else a.getClass.getName} $constructor.")
-          // We must make sure that primitives get boxed java style.
-          if (a == null) {
-            null
-          } else {
-            constructor.newInstance(a match {
-              case i: Int => i: java.lang.Integer
-              case bd: BigDecimal => new HiveDecimal(bd.underlying())
-              case other: AnyRef => other
-            }).asInstanceOf[AnyRef]
-          }
-        }
-      case None =>
-        (a: Any) => a match {
-          case wrapper => wrap(wrapper)
-        }
-    }
+  @transient
+  lazy val dataType = javaClassToDataType(method.getReturnType)
+
+  def catalystToHive(value: Any): Object = value match {
+    // TODO need more types here? or can we use wrap()
+    case bd: BigDecimal => new HiveDecimal(bd.underlying())
+    case d => d.asInstanceOf[Object]
   }
 
   // TODO: Finish input output types.
   override def eval(input: Row): Any = {
-    val evaluatedChildren = children.map(_.eval(input))
-    // Wrap the function arguments in the expected types.
-    val args = evaluatedChildren.zip(wrappers).map {
-      case (arg, wrapper) => wrapper(arg)
-    }
+    val evaluatedChildren = children.map(c => catalystToHive(c.eval(input)))
 
-    // Invoke the udf and unwrap the result.
-    unwrap(method.invoke(function, args: _*))
+    unwrap(FunctionRegistry.invoke(method, function, conversionHelper
+      .convertIfNecessary(evaluatedChildren: _*): _*))
   }
 }