Skip to content
Snippets Groups Projects
Commit 418ad83f authored by Cheng Hao's avatar Cheng Hao Committed by Michael Armbrust
Browse files

[SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant folding

```
explain extended select cos(null) from src limit 1;
```
outputs:
```
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
  MetastoreRelation default, src, None

== Optimized Logical Plan ==
Limit 1
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
  MetastoreRelation default, src, None

== Physical Plan ==
Limit 1
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
  HiveTableScan [], (MetastoreRelation default, src, None), None
```
After patching this PR it outputs
```
== Parsed Logical Plan ==
Limit 1
 Project ['cos(null) AS c_0#0]
  UnresolvedRelation None, src, None

== Analyzed Logical Plan ==
Limit 1
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#0]
  MetastoreRelation default, src, None

== Optimized Logical Plan ==
Limit 1
 Project [null AS c_0#0]
  MetastoreRelation default, src, None

== Physical Plan ==
Limit 1
 Project [null AS c_0#0]
  HiveTableScan [], (MetastoreRelation default, src, None), None
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes #2771 from chenghao-intel/hive_udf_constant_folding and squashes the following commits:

1379c73 [Cheng Hao] duplicate the PlanTest with catalyst/plans/PlanTest
1e52dda [Cheng Hao] add unit test for hive simple udf constant folding
01609ff [Cheng Hao] support constant folding for HiveSimpleUdf
parent 7e3a1ada
No related branches found
No related tags found
No related merge requests found
......@@ -33,7 +33,8 @@ class PlanTest extends FunSuite {
* we must normalize them to check if two different queries are identical.
*/
protected def normalizeExprIds(plan: LogicalPlan) = {
val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min
val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
val minId = if (list.isEmpty) 0 else list.min
plan transformAllExpressions {
case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
......
......@@ -99,6 +99,16 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[
@transient
protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray
@transient
protected lazy val isUDFDeterministic = {
val udfType = function.getClass().getAnnotation(classOf[HiveUDFType])
udfType != null && udfType.deterministic()
}
override def foldable = {
isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
}
// Create parameter converters
@transient
protected lazy val conversionHelper = new ConversionHelper(method, arguments)
......
......@@ -19,6 +19,8 @@ package org.apache.spark.sql
import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
......@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
* It is hard to have maven allow one subproject depend on another subprojects test code.
* So, we duplicate this code here.
*/
class QueryTest extends FunSuite {
class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.scalatest.FunSuite
/**
* *** DUPLICATED FROM sql/catalyst/plans. ***
*
* It is hard to have maven allow one subproject depend on another subprojects test code.
* So, we duplicate this code here.
*/
class PlanTest extends FunSuite {
/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
*/
protected def normalizeExprIds(plan: LogicalPlan) = {
val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
val minId = if (list.isEmpty) 0 else list.min
plan transformAllExpressions {
case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
}
}
/** Fails the test if the two plans do not match */
protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
val normalized1 = normalizeExprIds(plan1)
val normalized2 = normalizeExprIds(plan2)
if (normalized1 != normalized2)
fail(
s"""
|== FAIL: Plans do not match ===
|${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
""".stripMargin)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHive
class HivePlanTest extends QueryTest {
import TestHive._
test("udf constant folding") {
val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
comparePlans(optimized, correctAnswer)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment