Skip to content
Snippets Groups Projects
Commit d6cd3a18 authored by Davies Liu's avatar Davies Liu Committed by Davies Liu
Browse files

[SPARK-11599] [SQL] fix NPE when resolve Hive UDF in SQLParser

The DataFrame APIs that takes a SQL expression always use SQLParser, then the HiveFunctionRegistry will called outside of Hive state, cause NPE if there is not a active Session State for current thread (in PySpark).

cc rxin yhuai

Author: Davies Liu <davies@databricks.com>

Closes #9576 from davies/hive_udf.
parent c4e19b38
No related branches found
No related tags found
No related merge requests found
......@@ -454,7 +454,15 @@ class HiveContext private[hive](
// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
override protected[sql] lazy val functionRegistry: FunctionRegistry =
new HiveFunctionRegistry(FunctionRegistry.builtin.copy())
new HiveFunctionRegistry(FunctionRegistry.builtin.copy()) {
override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
// Hive Registry need current database to lookup function
// TODO: the current database of executionHive should be consistent with metadataHive
executionHive.withHiveState {
super.lookupFunction(name, children)
}
}
}
// The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer
// can't access the SessionState of metadataHive.
......
......@@ -20,22 +20,19 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.util.{Locale, TimeZone}
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
import scala.util.Try
import org.scalatest.BeforeAndAfter
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHiveContext
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.{SparkException, SparkFiles}
case class TestData(a: Int, b: String)
......@@ -1237,6 +1234,26 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
test("lookup hive UDF in another thread") {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
}
assert(e.getMessage.contains("undefined function not_a_udf"))
var success = false
val t = new Thread("test") {
override def run(): Unit = {
val e = intercept[AnalysisException] {
range(1).selectExpr("not_a_udf()")
}
assert(e.getMessage.contains("undefined function not_a_udf"))
success = true
}
}
t.start()
t.join()
assert(success)
}
createQueryTest("select from thrift based table",
"SELECT * from src_thrift")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment