diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 03b89221ef2d3a8b27706ba27daa2c3bb1e37e07..5deb2f81d1738f11018c1da349bc078b6fb901c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -1029,8 +1029,11 @@ case class ScalaUDF(
     // such as IntegerType, its javaType is `int` and the returned type of user-defined
     // function is Object. Trying to convert an Object to `int` will cause casting exception.
     val evalCode = evals.map(_.code).mkString
-    val funcArguments = converterTerms.zip(evals).map {
-      case (converter, eval) => s"$converter.apply(${eval.value})"
+    val funcArguments = converterTerms.zipWithIndex.map {
+      case (converter, i) =>
+        val eval = evals(i)
+        val dt = children(i).dataType
+        s"$converter.apply(${eval.isNull} ? null : (${ctx.boxedType(dt)}) ${eval.value})"
     }.mkString(",")
     val callFunc = s"${ctx.boxedType(ctx.javaType(dataType))} $resultTerm = " +
       s"(${ctx.boxedType(ctx.javaType(dataType))})${catalystConverterTerm}" +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 605a6549dd68659712046c789e617722e6fe8379..8887dc68a50e71ca1e0e20af634330188f06b2d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1138,14 +1138,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
   }
 
   test("SPARK-11725: correctly handle null inputs for ScalaUDF") {
-    val df = Seq(
+    val df = sparkContext.parallelize(Seq(
       new java.lang.Integer(22) -> "John",
-      null.asInstanceOf[java.lang.Integer] -> "Lucy").toDF("age", "name")
+      null.asInstanceOf[java.lang.Integer] -> "Lucy")).toDF("age", "name")
 
+    // passing null into the UDF that could handle it
     val boxedUDF = udf[java.lang.Integer, java.lang.Integer] {
-      (i: java.lang.Integer) => if (i == null) null else i * 2
+      (i: java.lang.Integer) => if (i == null) -10 else i * 2
     }
-    checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(null) :: Nil)
+    checkAnswer(df.select(boxedUDF($"age")), Row(44) :: Row(-10) :: Nil)
 
     val primitiveUDF = udf((i: Int) => i * 2)
     checkAnswer(df.select(primitiveUDF($"age")), Row(44) :: Row(null) :: Nil)