Skip to content
Snippets Groups Projects
Commit f481090a authored by navis.ryu's avatar navis.ryu Committed by Josh Rosen
Browse files

[SPARK-10151][SQL] Support invocation of hive macro

Macro in hive (which is GenericUDFMacro) contains real function inside of it but it's not conveyed to tasks, resulting null-pointer exception.

Author: navis.ryu <navis@apache.org>

Closes #8354 from navis/SPARK-10151.
parent dce2f8c9
No related branches found
No related tags found
No related merge requests found
Showing
with 27 additions and 8 deletions
...@@ -684,6 +684,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { ...@@ -684,6 +684,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"load_file_with_space_in_the_name", "load_file_with_space_in_the_name",
"loadpart1", "loadpart1",
"louter_join_ppr", "louter_join_ppr",
"macro",
"mapjoin_distinct", "mapjoin_distinct",
"mapjoin_filter_on_outerjoin", "mapjoin_filter_on_outerjoin",
"mapjoin_mapjoin", "mapjoin_mapjoin",
......
...@@ -21,6 +21,7 @@ import java.io.File ...@@ -21,6 +21,7 @@ import java.io.File
import java.net.{URL, URLClassLoader} import java.net.{URL, URLClassLoader}
import java.sql.Timestamp import java.sql.Timestamp
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
...@@ -592,10 +593,14 @@ class HiveContext private[hive]( ...@@ -592,10 +593,14 @@ class HiveContext private[hive](
) )
} }
private def functionOrMacroDDLPattern(command: String) = Pattern.compile(
".*(create|drop)\\s+(temporary\\s+)?(function|macro).+", Pattern.DOTALL).matcher(command)
protected[hive] def runSqlHive(sql: String): Seq[String] = { protected[hive] def runSqlHive(sql: String): Seq[String] = {
if (sql.toLowerCase.contains("create temporary function")) { val command = sql.trim.toLowerCase
if (functionOrMacroDDLPattern(command).matches()) {
executionHive.runSqlHive(sql) executionHive.runSqlHive(sql)
} else if (sql.trim.toLowerCase.startsWith("set")) { } else if (command.startsWith("set")) {
metadataHive.runSqlHive(sql) metadataHive.runSqlHive(sql)
executionHive.runSqlHive(sql) executionHive.runSqlHive(sql)
} else { } else {
......
...@@ -723,6 +723,10 @@ private[hive] trait HiveInspectors { ...@@ -723,6 +723,10 @@ private[hive] trait HiveInspectors {
inspectorToDataType(m.getMapValueObjectInspector)) inspectorToDataType(m.getMapValueObjectInspector))
case _: WritableStringObjectInspector => StringType case _: WritableStringObjectInspector => StringType
case _: JavaStringObjectInspector => StringType case _: JavaStringObjectInspector => StringType
case _: WritableHiveVarcharObjectInspector => StringType
case _: JavaHiveVarcharObjectInspector => StringType
case _: WritableHiveCharObjectInspector => StringType
case _: JavaHiveCharObjectInspector => StringType
case _: WritableIntObjectInspector => IntegerType case _: WritableIntObjectInspector => IntegerType
case _: JavaIntObjectInspector => IntegerType case _: JavaIntObjectInspector => IntegerType
case _: WritableDoubleObjectInspector => DoubleType case _: WritableDoubleObjectInspector => DoubleType
......
...@@ -117,6 +117,7 @@ private[hive] object HiveQl extends Logging { ...@@ -117,6 +117,7 @@ private[hive] object HiveQl extends Logging {
"TOK_CREATEDATABASE", "TOK_CREATEDATABASE",
"TOK_CREATEFUNCTION", "TOK_CREATEFUNCTION",
"TOK_CREATEINDEX", "TOK_CREATEINDEX",
"TOK_CREATEMACRO",
"TOK_CREATEROLE", "TOK_CREATEROLE",
"TOK_DESCDATABASE", "TOK_DESCDATABASE",
...@@ -125,6 +126,7 @@ private[hive] object HiveQl extends Logging { ...@@ -125,6 +126,7 @@ private[hive] object HiveQl extends Logging {
"TOK_DROPDATABASE", "TOK_DROPDATABASE",
"TOK_DROPFUNCTION", "TOK_DROPFUNCTION",
"TOK_DROPINDEX", "TOK_DROPINDEX",
"TOK_DROPMACRO",
"TOK_DROPROLE", "TOK_DROPROLE",
"TOK_DROPTABLE_PROPERTIES", "TOK_DROPTABLE_PROPERTIES",
"TOK_DROPVIEW", "TOK_DROPVIEW",
......
...@@ -117,9 +117,10 @@ private[hive] object HiveShim { ...@@ -117,9 +117,10 @@ private[hive] object HiveShim {
* Detail discussion can be found at https://github.com/apache/spark/pull/3640 * Detail discussion can be found at https://github.com/apache/spark/pull/3640
* *
* @param functionClassName UDF class name * @param functionClassName UDF class name
* @param instance optional UDF instance which contains additional information (for macro)
*/ */
private[hive] case class HiveFunctionWrapper(var functionClassName: String) private[hive] case class HiveFunctionWrapper(var functionClassName: String,
extends java.io.Externalizable { private var instance: AnyRef = null) extends java.io.Externalizable {
// for Serialization // for Serialization
def this() = this(null) def this() = this(null)
...@@ -154,8 +155,6 @@ private[hive] object HiveShim { ...@@ -154,8 +155,6 @@ private[hive] object HiveShim {
serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out) serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
} }
private var instance: AnyRef = null
def writeExternal(out: java.io.ObjectOutput) { def writeExternal(out: java.io.ObjectOutput) {
// output the function name // output the function name
out.writeUTF(functionClassName) out.writeUTF(functionClassName)
...@@ -184,7 +183,7 @@ private[hive] object HiveShim { ...@@ -184,7 +183,7 @@ private[hive] object HiveShim {
// read the function in bytes // read the function in bytes
val functionInBytesLength = in.readInt() val functionInBytesLength = in.readInt()
val functionInBytes = new Array[Byte](functionInBytesLength) val functionInBytes = new Array[Byte](functionInBytesLength)
in.read(functionInBytes, 0, functionInBytesLength) in.readFully(functionInBytes)
// deserialize the function object via Hive Utilities // deserialize the function object via Hive Utilities
instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes), instance = deserializePlan[AnyRef](new java.io.ByteArrayInputStream(functionInBytes),
......
...@@ -64,7 +64,10 @@ private[hive] class HiveFunctionRegistry(underlying: analysis.FunctionRegistry) ...@@ -64,7 +64,10 @@ private[hive] class HiveFunctionRegistry(underlying: analysis.FunctionRegistry)
// don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we // don't satisfy the hive UDF, such as type mismatch, input number mismatch, etc. Here we
// catch the exception and throw AnalysisException instead. // catch the exception and throw AnalysisException instead.
try { try {
if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) { if (classOf[GenericUDFMacro].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUDF(
new HiveFunctionWrapper(functionClassName, functionInfo.getGenericUDF), children)
} else if (classOf[UDF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveSimpleUDF(new HiveFunctionWrapper(functionClassName), children) HiveSimpleUDF(new HiveFunctionWrapper(functionClassName), children)
} else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) { } else if (classOf[GenericUDF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUDF(new HiveFunctionWrapper(functionClassName), children) HiveGenericUDF(new HiveFunctionWrapper(functionClassName), children)
......
0
10
0.8807970779778823
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment