Skip to content
Snippets Groups Projects
Commit 02026a81 authored by Wenchen Fan's avatar Wenchen Fan Committed by Yin Huai
Browse files

[SPARK-10671] [SQL] Throws an analysis exception if we cannot find Hive UDFs

Takes over https://github.com/apache/spark/pull/8800

Author: Wenchen Fan <cloud0fan@163.com>

Closes #8941 from cloud-fan/hive-udf.
parent 4d8c7c6d
No related branches found
No related tags found
No related merge requests found
...@@ -60,20 +60,36 @@ private[hive] class HiveFunctionRegistry(underlying: analysis.FunctionRegistry) ...@@ -60,20 +60,36 @@ private[hive] class HiveFunctionRegistry(underlying: analysis.FunctionRegistry)
val functionClassName = functionInfo.getFunctionClass.getName val functionClassName = functionInfo.getFunctionClass.getName
if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) { // When we instantiate hive UDF wrapper class, we may throw exception if the input expressions
HiveSimpleUDF(new HiveFunctionWrapper(functionClassName), children) // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we
} else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) { // catch the exception and throw AnalysisException instead.
HiveGenericUDF(new HiveFunctionWrapper(functionClassName), children) try {
} else if ( if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) { HiveSimpleUDF(new HiveFunctionWrapper(functionClassName), children)
HiveUDAFFunction(new HiveFunctionWrapper(functionClassName), children) } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
} else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) { HiveGenericUDF(new HiveFunctionWrapper(functionClassName), children)
HiveUDAFFunction( } else if (
new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true) classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) { HiveUDAFFunction(new HiveFunctionWrapper(functionClassName), children)
HiveGenericUDTF(new HiveFunctionWrapper(functionClassName), children) } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
} else { HiveUDAFFunction(
sys.error(s"No handler for udf ${functionInfo.getFunctionClass}") new HiveFunctionWrapper(functionClassName), children, isUDAFBridgeRequired = true)
} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
val udtf = HiveGenericUDTF(new HiveFunctionWrapper(functionClassName), children)
udtf.elementTypes // Force it to check input data types.
udtf
} else {
throw new AnalysisException(s"No handler for udf ${functionInfo.getFunctionClass}")
}
} catch {
case analysisException: AnalysisException =>
// If the exception is an AnalysisException, just throw it.
throw analysisException
case throwable: Throwable =>
// If there is any other error, we throw an AnalysisException.
val errorMessage = s"No handler for Hive udf ${functionInfo.getFunctionClass} " +
s"because: ${throwable.getMessage}."
throw new AnalysisException(errorMessage)
} }
} }
} }
...@@ -134,7 +150,7 @@ private[hive] case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, childre ...@@ -134,7 +150,7 @@ private[hive] case class HiveSimpleUDF(funcWrapper: HiveFunctionWrapper, childre
@transient @transient
private lazy val conversionHelper = new ConversionHelper(method, arguments) private lazy val conversionHelper = new ConversionHelper(method, arguments)
val dataType = javaClassToDataType(method.getReturnType) override val dataType = javaClassToDataType(method.getReturnType)
@transient @transient
lazy val returnInspector = ObjectInspectorFactory.getReflectionObjectInspector( lazy val returnInspector = ObjectInspectorFactory.getReflectionObjectInspector(
...@@ -205,7 +221,7 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr ...@@ -205,7 +221,7 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr
new DeferredObjectAdapter(inspect, child.dataType) new DeferredObjectAdapter(inspect, child.dataType)
}.toArray[DeferredObject] }.toArray[DeferredObject]
lazy val dataType: DataType = inspectorToDataType(returnInspector) override val dataType: DataType = inspectorToDataType(returnInspector)
override def eval(input: InternalRow): Any = { override def eval(input: InternalRow): Any = {
returnInspector // Make sure initialized. returnInspector // Make sure initialized.
...@@ -231,6 +247,12 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr ...@@ -231,6 +247,12 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr
* Resolves [[UnresolvedWindowFunction]] to [[HiveWindowFunction]]. * Resolves [[UnresolvedWindowFunction]] to [[HiveWindowFunction]].
*/ */
private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] { private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] {
private def shouldResolveFunction(
unresolvedWindowFunction: UnresolvedWindowFunction,
windowSpec: WindowSpecDefinition): Boolean = {
unresolvedWindowFunction.childrenResolved && windowSpec.childrenResolved
}
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if !p.childrenResolved => p
...@@ -238,9 +260,11 @@ private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] { ...@@ -238,9 +260,11 @@ private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] {
// replaced those WindowSpecReferences. // replaced those WindowSpecReferences.
case p: LogicalPlan => case p: LogicalPlan =>
p transformExpressions { p transformExpressions {
// We will not start to resolve the function unless all arguments are resolved
// and all expressions in window spec are fixed.
case WindowExpression( case WindowExpression(
UnresolvedWindowFunction(name, children), u @ UnresolvedWindowFunction(name, children),
windowSpec: WindowSpecDefinition) => windowSpec: WindowSpecDefinition) if shouldResolveFunction(u, windowSpec) =>
// First, let's find the window function info. // First, let's find the window function info.
val windowFunctionInfo: WindowFunctionInfo = val windowFunctionInfo: WindowFunctionInfo =
Option(FunctionRegistry.getWindowFunctionInfo(name.toLowerCase)).getOrElse( Option(FunctionRegistry.getWindowFunctionInfo(name.toLowerCase)).getOrElse(
...@@ -256,7 +280,7 @@ private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] { ...@@ -256,7 +280,7 @@ private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] {
// are expressions in Order By clause. // are expressions in Order By clause.
if (classOf[GenericUDAFRank].isAssignableFrom(functionClass)) { if (classOf[GenericUDAFRank].isAssignableFrom(functionClass)) {
if (children.nonEmpty) { if (children.nonEmpty) {
throw new AnalysisException(s"$name does not take input parameters.") throw new AnalysisException(s"$name does not take input parameters.")
} }
windowSpec.orderSpec.map(_.child) windowSpec.orderSpec.map(_.child)
} else { } else {
...@@ -358,7 +382,7 @@ private[hive] case class HiveWindowFunction( ...@@ -358,7 +382,7 @@ private[hive] case class HiveWindowFunction(
evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inputInspectors) evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inputInspectors)
} }
override def dataType: DataType = override val dataType: DataType =
if (!pivotResult) { if (!pivotResult) {
inspectorToDataType(returnInspector) inspectorToDataType(returnInspector)
} else { } else {
...@@ -478,7 +502,7 @@ private[hive] case class HiveGenericUDTF( ...@@ -478,7 +502,7 @@ private[hive] case class HiveGenericUDTF(
@transient @transient
protected lazy val collector = new UDTFCollector protected lazy val collector = new UDTFCollector
lazy val elementTypes = outputInspector.getAllStructFieldRefs.asScala.map { override lazy val elementTypes = outputInspector.getAllStructFieldRefs.asScala.map {
field => (inspectorToDataType(field.getFieldObjectInspector), true) field => (inspectorToDataType(field.getFieldObjectInspector), true)
} }
...@@ -602,6 +626,6 @@ private[hive] case class HiveUDAFFunction( ...@@ -602,6 +626,6 @@ private[hive] case class HiveUDAFFunction(
override def supportsPartial: Boolean = false override def supportsPartial: Boolean = false
override lazy val dataType: DataType = inspectorToDataType(returnInspector) override val dataType: DataType = inspectorToDataType(returnInspector)
} }
...@@ -21,7 +21,8 @@ import java.io.{DataInput, DataOutput} ...@@ -21,7 +21,8 @@ import java.io.{DataInput, DataOutput}
import java.util.{ArrayList, Arrays, Properties} import java.util.{ArrayList, Arrays, Properties}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF} import org.apache.hadoop.hive.ql.udf.UDAFPercentile
import org.apache.hadoop.hive.ql.udf.generic.{GenericUDFOPAnd, GenericUDTFExplode, GenericUDAFAverage, GenericUDF}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
...@@ -299,6 +300,62 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { ...@@ -299,6 +300,62 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton {
hiveContext.reset() hiveContext.reset()
} }
test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") {
Seq((1, 2)).toDF("a", "b").registerTempTable("testUDF")
{
// HiveSimpleUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFTwoListList() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList")
}
{
// HiveGenericUDF
sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDFAnd() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd")
}
{
// Hive UDAF
sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile")
}
{
// AbstractGenericUDAFResolver
sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage")
}
{
// Hive UDTF
sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'")
val message = intercept[AnalysisException] {
sql("SELECT testUDTFExplode() FROM testUDF")
}.getMessage
assert(message.contains("No handler for Hive udf"))
sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode")
}
sqlContext.dropTempTable("testUDF")
}
} }
class TestPair(x: Int, y: Int) extends Writable with Serializable { class TestPair(x: Int, y: Int) extends Writable with Serializable {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment