From 82bb7fd41a2c7992e0aea69623c504bd439744f7 Mon Sep 17 00:00:00 2001
From: baishuo <vc_java@hotmail.com>
Date: Mon, 27 Apr 2015 14:08:05 +0800
Subject: [PATCH] [SPARK-6505] [SQL] Remove the reflection call in
 HiveFunctionWrapper
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

according liancheng‘s  comment in https://issues.apache.org/jira/browse/SPARK-6505,  this patch remove the  reflection call in HiveFunctionWrapper, and implement the functions named "deserializeObjectByKryo" and "serializeObjectByKryo" according the functions with the save name in
org.apache.hadoop.hive.ql.exec.Utilities.java

Author: baishuo <vc_java@hotmail.com>

Closes #5660 from baishuo/SPARK-6505-20150423 and squashes the following commits:

ae61ec4 [baishuo] modify code style
78d9fa3 [baishuo] modify code style
0b522a7 [baishuo] modify code style
a5ff9c7 [baishuo] Remove the reflection call in HiveFunctionWrapper
---
 .../org/apache/spark/sql/hive/Shim13.scala    | 44 +++++++++----------
 1 file changed, 22 insertions(+), 22 deletions(-)

diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index d331c210e8..dbc5e029e2 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.hive
 
 import java.rmi.server.UID
 import java.util.{Properties, ArrayList => JArrayList}
+import java.io.{OutputStream, InputStream}
 
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
+import scala.reflect.ClassTag
 
 import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Input
+import com.esotericsoftware.kryo.io.Output
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
@@ -46,6 +50,7 @@ import org.apache.hadoop.{io => hadoopIo}
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
+import org.apache.spark.util.Utils._
 
 /**
  * This class provides the UDF creation and also the UDF instance serialization and
@@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
   // for Serialization
   def this() = this(null)
 
-  import org.apache.spark.util.Utils._
-
   @transient
-  private val methodDeSerialize = {
-    val method = classOf[Utilities].getDeclaredMethod(
-      "deserializeObjectByKryo",
-      classOf[Kryo],
-      classOf[java.io.InputStream],
-      classOf[Class[_]])
-    method.setAccessible(true)
-
-    method
+  def deserializeObjectByKryo[T: ClassTag](
+      kryo: Kryo,
+      in: InputStream,
+      clazz: Class[_]): T = {
+    val inp = new Input(in)
+    val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
+    inp.close()
+    t
   }
 
   @transient
-  private val methodSerialize = {
-    val method = classOf[Utilities].getDeclaredMethod(
-      "serializeObjectByKryo",
-      classOf[Kryo],
-      classOf[Object],
-      classOf[java.io.OutputStream])
-    method.setAccessible(true)
-
-    method
+  def serializeObjectByKryo(
+      kryo: Kryo,
+      plan: Object,
+      out: OutputStream ) {
+    val output: Output = new Output(out)
+    kryo.writeObject(output, plan)
+    output.close()
   }
 
   def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
-    methodDeSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), is, clazz)
+    deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
       .asInstanceOf[UDFType]
   }
 
   def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
-    methodSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), function, out)
+    serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
   }
 
   private var instance: AnyRef = null
-- 
GitLab