From efa11a42f0c34dcfaf4a1bf17055539c43c8e4f9 Mon Sep 17 00:00:00 2001
From: Xiao Li <gatorsmile@gmail.com>
Date: Mon, 17 Apr 2017 15:59:55 +0800
Subject: [PATCH] [SPARK-20335][SQL][BACKPORT-2.1] Children expressions of Hive
 UDF impacts the determinism of Hive UDF

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/17635 to Spark 2.1

---
```JAVA
  /**
   * Certain optimizations should not be applied if UDF is not deterministic.
   * Deterministic UDF returns same result each time it is invoked with a
   * particular input. This determinism just needs to hold within the context of
   * a query.
   *
   * return true if the UDF is deterministic
   */
  boolean deterministic() default true;
```

Based on the definition of [UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50), when Hive UDF's children are non-deterministic, Hive UDF is also non-deterministic.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17652 from gatorsmile/backport-17635.
---
 .../org/apache/spark/sql/hive/hiveUDFs.scala  |  4 +--
 .../execution/AggregationQuerySuite.scala     | 13 ++++++++
 .../sql/hive/execution/HiveUDFSuite.scala     | 30 +++++++++++++++++++
 3 files changed, 45 insertions(+), 2 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 37414ad129..3e46b74613 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -42,7 +42,7 @@ private[hive] case class HiveSimpleUDF(
     name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
   extends Expression with HiveInspectors with CodegenFallback with Logging {
 
-  override def deterministic: Boolean = isUDFDeterministic
+  override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)
 
   override def nullable: Boolean = true
 
@@ -120,7 +120,7 @@ private[hive] case class HiveGenericUDF(
 
   override def nullable: Boolean = true
 
-  override def deterministic: Boolean = isUDFDeterministic
+  override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)
 
   override def foldable: Boolean =
     isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 4a8086d7e5..84f915977b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -509,6 +509,19 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
         Row(null, null, 110.0, null, null, 10.0) :: Nil)
   }
 
+  test("non-deterministic children expressions of UDAF") {
+    val e = intercept[AnalysisException] {
+      spark.sql(
+        """
+          |SELECT mydoublesum(value + 1.5 * key + rand())
+          |FROM agg1
+          |GROUP BY key
+        """.stripMargin)
+    }.getMessage
+    assert(Seq("nondeterministic expression",
+      "should not appear in the arguments of an aggregate function").forall(e.contains))
+  }
+
   test("interpreted aggregate function") {
     checkAnswer(
       spark.sql(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 4098bb597b..78c80dacb9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
 import org.apache.hadoop.io.{LongWritable, Writable}
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.functions.max
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -338,6 +339,35 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
     hiveContext.reset()
   }
 
+  test("non-deterministic children of UDF") {
+    withUserDefinedFunction("testStringStringUDF" -> true, "testGenericUDFHash" -> true) {
+      // HiveSimpleUDF
+      sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'")
+      val df1 = sql("SELECT testStringStringUDF(rand(), \"hello\")")
+      assert(!df1.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
+
+      // HiveGenericUDF
+      sql(s"CREATE TEMPORARY FUNCTION testGenericUDFHash AS '${classOf[GenericUDFHash].getName}'")
+      val df2 = sql("SELECT testGenericUDFHash(rand())")
+      assert(!df2.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
+    }
+  }
+
+  test("non-deterministic children expressions of UDAF") {
+    withTempView("view1") {
+      spark.range(1).selectExpr("id as x", "id as y").createTempView("view1")
+      withUserDefinedFunction("testUDAFPercentile" -> true) {
+        // non-deterministic children of Hive UDAF
+        sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
+        val e1 = intercept[AnalysisException] {
+          sql("SELECT testUDAFPercentile(x, rand()) from view1 group by y")
+        }.getMessage
+        assert(Seq("nondeterministic expression",
+          "should not appear in the arguments of an aggregate function").forall(e1.contains))
+      }
+    }
+  }
+
   test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
     Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
 
-- 
GitLab