Skip to content
Snippets Groups Projects
Commit 82bb7fd4 authored by baishuo's avatar baishuo Committed by Cheng Lian
Browse files

[SPARK-6505] [SQL] Remove the reflection call in HiveFunctionWrapper

according liancheng‘s  comment in https://issues.apache.org/jira/browse/SPARK-6505,  this patch remove the  reflection call in HiveFunctionWrapper, and implement the functions named "deserializeObjectByKryo" and "serializeObjectByKryo" according the functions with the save name in
org.apache.hadoop.hive.ql.exec.Utilities.java

Author: baishuo <vc_java@hotmail.com>

Closes #5660 from baishuo/SPARK-6505-20150423 and squashes the following commits:

ae61ec4 [baishuo] modify code style
78d9fa3 [baishuo] modify code style
0b522a7 [baishuo] modify code style
a5ff9c7 [baishuo] Remove the reflection call in HiveFunctionWrapper
parent d188b8ba
No related branches found
No related tags found
No related merge requests found
...@@ -19,11 +19,15 @@ package org.apache.spark.sql.hive ...@@ -19,11 +19,15 @@ package org.apache.spark.sql.hive
import java.rmi.server.UID import java.rmi.server.UID
import java.util.{Properties, ArrayList => JArrayList} import java.util.{Properties, ArrayList => JArrayList}
import java.io.{OutputStream, InputStream}
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.language.implicitConversions import scala.language.implicitConversions
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.common.StatsSetupConst
...@@ -46,6 +50,7 @@ import org.apache.hadoop.{io => hadoopIo} ...@@ -46,6 +50,7 @@ import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String} import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
import org.apache.spark.util.Utils._
/** /**
* This class provides the UDF creation and also the UDF instance serialization and * This class provides the UDF creation and also the UDF instance serialization and
...@@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String) ...@@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
// for Serialization // for Serialization
def this() = this(null) def this() = this(null)
import org.apache.spark.util.Utils._
@transient @transient
private val methodDeSerialize = { def deserializeObjectByKryo[T: ClassTag](
val method = classOf[Utilities].getDeclaredMethod( kryo: Kryo,
"deserializeObjectByKryo", in: InputStream,
classOf[Kryo], clazz: Class[_]): T = {
classOf[java.io.InputStream], val inp = new Input(in)
classOf[Class[_]]) val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
method.setAccessible(true) inp.close()
t
method
} }
@transient @transient
private val methodSerialize = { def serializeObjectByKryo(
val method = classOf[Utilities].getDeclaredMethod( kryo: Kryo,
"serializeObjectByKryo", plan: Object,
classOf[Kryo], out: OutputStream ) {
classOf[Object], val output: Output = new Output(out)
classOf[java.io.OutputStream]) kryo.writeObject(output, plan)
method.setAccessible(true) output.close()
method
} }
def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
methodDeSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), is, clazz) deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
.asInstanceOf[UDFType] .asInstanceOf[UDFType]
} }
def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
methodSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), function, out) serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
} }
private var instance: AnyRef = null private var instance: AnyRef = null
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment