diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index ec56fe7729c2a31f9d26f714a20506586832e0f4..57f7a80bedc6cf6bd579b649b46e2b612f21ad1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -44,6 +44,3 @@ class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[Tabl class FunctionAlreadyExistsException(db: String, func: String) extends AnalysisException(s"Function '$func' already exists in database '$db'") - -class TempFunctionAlreadyExistsException(func: String) - extends AnalysisException(s"Temporary function '$func' already exists") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index faedf5f91c3ef34b11505783581fdc2ce14d85db..1417bccf657cd0da04c15b8ee141920fe51bb1a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1050,7 +1050,7 @@ class SessionCatalog( * * This performs reflection to decide what type of [[Expression]] to return in the builder. */ - def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { + protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { // TODO: at least support UDAFs here throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.") } @@ -1064,18 +1064,20 @@ class SessionCatalog( } /** - * Create a temporary function. - * This assumes no database is specified in `funcDefinition`. + * Registers a temporary or permanent function into a session-specific [[FunctionRegistry]] */ - def createTempFunction( - name: String, - info: ExpressionInfo, - funcDefinition: FunctionBuilder, - ignoreIfExists: Boolean): Unit = { - if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) { - throw new TempFunctionAlreadyExistsException(name) + def registerFunction( + funcDefinition: CatalogFunction, + ignoreIfExists: Boolean, + functionBuilder: Option[FunctionBuilder] = None): Unit = { + val func = funcDefinition.identifier + if (functionRegistry.functionExists(func.unquotedString) && !ignoreIfExists) { + throw new AnalysisException(s"Function $func already exists") } - functionRegistry.registerFunction(name, info, funcDefinition) + val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName) + val builder = + functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className)) + functionRegistry.registerFunction(func.unquotedString, info, builder) } /** @@ -1180,12 +1182,7 @@ class SessionCatalog( // catalog. So, it is possible that qualifiedName is not exactly the same as // catalogFunction.identifier.unquotedString (difference is on case-sensitivity). // At here, we preserve the input from the user. - val info = new ExpressionInfo( - catalogFunction.className, - qualifiedName.database.orNull, - qualifiedName.funcName) - val builder = makeFunctionBuilder(qualifiedName.unquotedString, catalogFunction.className) - createTempFunction(qualifiedName.unquotedString, info, builder, ignoreIfExists = false) + registerFunction(catalogFunction.copy(identifier = qualifiedName), ignoreIfExists = false) // Now, we need to create the Expression. functionRegistry.lookupFunction(qualifiedName.unquotedString, children) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 9ba846fb252790b1758681d37313e86a8c963d8e..be8903000a0d13ffc094526c6e6af77046aab820 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1162,10 +1162,10 @@ abstract class SessionCatalogSuite extends PlanTest { withBasicCatalog { catalog => val tempFunc1 = (e: Seq[Expression]) => e.head val tempFunc2 = (e: Seq[Expression]) => e.last - val info1 = new ExpressionInfo("tempFunc1", "temp1") - val info2 = new ExpressionInfo("tempFunc2", "temp2") - catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) - catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = false) + catalog.registerFunction( + newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1)) + catalog.registerFunction( + newFunc("temp2", None), ignoreIfExists = false, functionBuilder = Some(tempFunc2)) val arguments = Seq(Literal(1), Literal(2), Literal(3)) assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1)) assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3)) @@ -1174,13 +1174,15 @@ abstract class SessionCatalogSuite extends PlanTest { catalog.lookupFunction(FunctionIdentifier("temp3"), arguments) } val tempFunc3 = (e: Seq[Expression]) => Literal(e.size) - val info3 = new ExpressionInfo("tempFunc3", "temp1") // Temporary function already exists - intercept[TempFunctionAlreadyExistsException] { - catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false) - } + val e = intercept[AnalysisException] { + catalog.registerFunction( + newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc3)) + }.getMessage + assert(e.contains("Function temp1 already exists")) // Temporary function is overridden - catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = true) + catalog.registerFunction( + newFunc("temp1", None), ignoreIfExists = true, functionBuilder = Some(tempFunc3)) assert( catalog.lookupFunction( FunctionIdentifier("temp1"), arguments) === Literal(arguments.length)) @@ -1193,8 +1195,8 @@ abstract class SessionCatalogSuite extends PlanTest { assert(!catalog.isTemporaryFunction(FunctionIdentifier("temp1"))) val tempFunc1 = (e: Seq[Expression]) => e.head - val info1 = new ExpressionInfo("tempFunc1", "temp1") - catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = false) + catalog.registerFunction( + newFunc("temp1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1)) // Returns true when the function is temporary assert(catalog.isTemporaryFunction(FunctionIdentifier("temp1"))) @@ -1243,9 +1245,9 @@ abstract class SessionCatalogSuite extends PlanTest { test("drop temp function") { withBasicCatalog { catalog => - val info = new ExpressionInfo("tempFunc", "func1") val tempFunc = (e: Seq[Expression]) => e.head - catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false) + catalog.registerFunction( + newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc)) val arguments = Seq(Literal(1), Literal(2), Literal(3)) assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1)) catalog.dropTempFunction("func1", ignoreIfNotExists = false) @@ -1284,9 +1286,9 @@ abstract class SessionCatalogSuite extends PlanTest { test("lookup temp function") { withBasicCatalog { catalog => - val info1 = new ExpressionInfo("tempFunc1", "func1") val tempFunc1 = (e: Seq[Expression]) => e.head - catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) + catalog.registerFunction( + newFunc("func1", None), ignoreIfExists = false, functionBuilder = Some(tempFunc1)) assert(catalog.lookupFunction( FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1)) catalog.dropTempFunction("func1", ignoreIfNotExists = false) @@ -1298,14 +1300,14 @@ abstract class SessionCatalogSuite extends PlanTest { test("list functions") { withBasicCatalog { catalog => - val info1 = new ExpressionInfo("tempFunc1", "func1") - val info2 = new ExpressionInfo("tempFunc2", "yes_me") + val funcMeta1 = newFunc("func1", None) + val funcMeta2 = newFunc("yes_me", None) val tempFunc1 = (e: Seq[Expression]) => e.head val tempFunc2 = (e: Seq[Expression]) => e.last catalog.createFunction(newFunc("func2", Some("db2")), ignoreIfExists = false) catalog.createFunction(newFunc("not_me", Some("db2")), ignoreIfExists = false) - catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = false) - catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = false) + catalog.registerFunction(funcMeta1, ignoreIfExists = false, functionBuilder = Some(tempFunc1)) + catalog.registerFunction(funcMeta2, ignoreIfExists = false, functionBuilder = Some(tempFunc2)) assert(catalog.listFunctions("db1", "*").map(_._1).toSet == Set(FunctionIdentifier("func1"), FunctionIdentifier("yes_me"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 5687f9332430ea0afe4d9819bb1388ae1a02480e..e0d002936957672584215f8bbf8580b1de285c95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -51,6 +51,7 @@ case class CreateFunctionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog + val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources) if (isTemp) { if (databaseName.isDefined) { throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " + @@ -59,17 +60,13 @@ case class CreateFunctionCommand( // We first load resources and then put the builder in the function registry. // Please note that it is allowed to overwrite an existing temp function. catalog.loadFunctionResources(resources) - val info = new ExpressionInfo(className, functionName) - val builder = catalog.makeFunctionBuilder(functionName, className) - catalog.createTempFunction(functionName, info, builder, ignoreIfExists = false) + catalog.registerFunction(func, ignoreIfExists = false) } else { // For a permanent, we will store the metadata into underlying external catalog. // This function will be loaded into the FunctionRegistry when a query uses it. // We do not load it into FunctionRegistry right now. // TODO: should we also parse "IF NOT EXISTS"? - catalog.createFunction( - CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources), - ignoreIfExists = false) + catalog.createFunction(func, ignoreIfExists = false) } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 6469e501c1f68bfe3828aa29c4abb1686910d1c1..8f9c52cb1e031062e0cd75379879eb1f8a382e60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -75,9 +75,10 @@ class CatalogSuite } private def createTempFunction(name: String): Unit = { - val info = new ExpressionInfo("className", name) val tempFunc = (e: Seq[Expression]) => e.head - sessionCatalog.createTempFunction(name, info, tempFunc, ignoreIfExists = false) + val funcMeta = CatalogFunction(FunctionIdentifier(name, None), "className", Nil) + sessionCatalog.registerFunction( + funcMeta, ignoreIfExists = false, functionBuilder = Some(tempFunc)) } private def dropFunction(name: String, db: Option[String] = None): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index c917f110b90f2673271fdf446154b2a4a48b4de6..377d4f2473c5882a541db673228e7ad5bdd984a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf @@ -124,13 +124,6 @@ private[sql] class HiveSessionCatalog( } private def lookupFunction0(name: FunctionIdentifier, children: Seq[Expression]): Expression = { - // TODO: Once lookupFunction accepts a FunctionIdentifier, we should refactor this method to - // if (super.functionExists(name)) { - // super.lookupFunction(name, children) - // } else { - // // This function is a Hive builtin function. - // ... - // } val database = name.database.map(formatDatabaseName) val funcName = name.copy(database = database) Try(super.lookupFunction(funcName, children)) match { @@ -164,10 +157,11 @@ private[sql] class HiveSessionCatalog( } } val className = functionInfo.getFunctionClass.getName - val builder = makeFunctionBuilder(functionName, className) + val functionIdentifier = + FunctionIdentifier(functionName.toLowerCase(Locale.ROOT), database) + val func = CatalogFunction(functionIdentifier, className, Nil) // Put this Hive built-in function to our function registry. - val info = new ExpressionInfo(className, functionName) - createTempFunction(functionName, info, builder, ignoreIfExists = false) + registerFunction(func, ignoreIfExists = false) // Now, we need to create the Expression. functionRegistry.lookupFunction(functionName, children) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala index 197110f4912a7bdf7fd4c7e6c9a517f1a1d22c31..73383ae4d4118abc9a7d4214dfc4fb8f2976428b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala @@ -22,7 +22,9 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.expressions.{ExpressionInfo, Literal} +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogFunction +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile import org.apache.spark.sql.hive.HiveSessionCatalog import org.apache.spark.sql.hive.execution.TestingTypedCount @@ -217,9 +219,9 @@ class ObjectHashAggregateExecBenchmark extends BenchmarkBase with TestHiveSingle private def registerHiveFunction(functionName: String, clazz: Class[_]): Unit = { val sessionCatalog = sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog] - val builder = sessionCatalog.makeFunctionBuilder(functionName, clazz.getName) - val info = new ExpressionInfo(clazz.getName, functionName) - sessionCatalog.createTempFunction(functionName, info, builder, ignoreIfExists = false) + val functionIdentifier = FunctionIdentifier(functionName, database = None) + val func = CatalogFunction(functionIdentifier, clazz.getName, resources = Nil) + sessionCatalog.registerFunction(func, ignoreIfExists = false) } private def percentile_approx(