Skip to content
Snippets Groups Projects
Commit efa11a42 authored by Xiao Li's avatar Xiao Li Committed by Wenchen Fan
Browse files

[SPARK-20335][SQL][BACKPORT-2.1] Children expressions of Hive UDF impacts the...

[SPARK-20335][SQL][BACKPORT-2.1] Children expressions of Hive UDF impacts the determinism of Hive UDF

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/17635 to Spark 2.1

---
```JAVA
  /**
   * Certain optimizations should not be applied if UDF is not deterministic.
   * Deterministic UDF returns same result each time it is invoked with a
   * particular input. This determinism just needs to hold within the context of
   * a query.
   *
   * return true if the UDF is deterministic
   */
  boolean deterministic() default true;
```

Based on the definition of [UDFType](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFType.java#L42-L50), when Hive UDF's children are non-deterministic, Hive UDF is also non-deterministic.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17652 from gatorsmile/backport-17635.
parent 2a3e50e2
No related branches found
No related tags found
No related merge requests found
......@@ -42,7 +42,7 @@ private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression with HiveInspectors with CodegenFallback with Logging {
override def deterministic: Boolean = isUDFDeterministic
override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)
override def nullable: Boolean = true
......@@ -120,7 +120,7 @@ private[hive] case class HiveGenericUDF(
override def nullable: Boolean = true
override def deterministic: Boolean = isUDFDeterministic
override def deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)
override def foldable: Boolean =
isUDFDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector]
......
......@@ -509,6 +509,19 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
Row(null, null, 110.0, null, null, 10.0) :: Nil)
}
test("non-deterministic children expressions of UDAF") {
val e = intercept[AnalysisException] {
spark.sql(
"""
|SELECT mydoublesum(value + 1.5 * key + rand())
|FROM agg1
|GROUP BY key
""".stripMargin)
}.getMessage
assert(Seq("nondeterministic expression",
"should not appear in the arguments of an aggregate function").forall(e.contains))
}
test("interpreted aggregate function") {
checkAnswer(
spark.sql(
......
......@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
import org.apache.hadoop.io.{LongWritable, Writable}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
......@@ -338,6 +339,35 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
hiveContext.reset()
}
test("non-deterministic children of UDF") {
withUserDefinedFunction("testStringStringUDF" -> true, "testGenericUDFHash" -> true) {
// HiveSimpleUDF
sql(s"CREATE TEMPORARY FUNCTION testStringStringUDF AS '${classOf[UDFStringString].getName}'")
val df1 = sql("SELECT testStringStringUDF(rand(), \"hello\")")
assert(!df1.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
// HiveGenericUDF
sql(s"CREATE TEMPORARY FUNCTION testGenericUDFHash AS '${classOf[GenericUDFHash].getName}'")
val df2 = sql("SELECT testGenericUDFHash(rand())")
assert(!df2.logicalPlan.asInstanceOf[Project].projectList.forall(_.deterministic))
}
}
test("non-deterministic children expressions of UDAF") {
withTempView("view1") {
spark.range(1).selectExpr("id as x", "id as y").createTempView("view1")
withUserDefinedFunction("testUDAFPercentile" -> true) {
// non-deterministic children of Hive UDAF
sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
val e1 = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(x, rand()) from view1 group by y")
}.getMessage
assert(Seq("nondeterministic expression",
"should not appear in the arguments of an aggregate function").forall(e1.contains))
}
}
}
test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment