Skip to content
Snippets Groups Projects
Commit e749f5de authored by wangfei's avatar wangfei Committed by Michael Armbrust
Browse files

[SPARK-4191][SQL]move wrapperFor to HiveInspectors to reuse it

Move wrapperFor in InsertIntoHiveTable to HiveInspectors to reuse them, this method can be reused when writing date with ObjectInspector(such as orc support)

Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>

Closes #3057 from scwf/reuse-wraperfor and squashes the following commits:

7ccf932 [scwf] fix conflicts
d44f4da [wangfei] fix imports
9bf1b50 [wangfei] revert no related change
9a5276a [wangfei] move wrapfor to hiveinspector to reuse them
parent c9f84004
No related branches found
No related tags found
No related merge requests found
......@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector._
......@@ -114,6 +114,51 @@ private[hive] trait HiveInspectors {
unwrap(si.getStructFieldData(data,r), r.getFieldObjectInspector)).toArray)
}
/**
* Wraps with Hive types based on object inspector.
* TODO: Consolidate all hive OI/data interface code.
*/
/**
* Wraps with Hive types based on object inspector.
* TODO: Consolidate all hive OI/data interface code.
*/
protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
case _: JavaHiveVarcharObjectInspector =>
(o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)
case _: JavaHiveDecimalObjectInspector =>
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toBigDecimal.underlying())
case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
(o: Any) => {
val struct = soi.create()
(soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
}
case loi: ListObjectInspector =>
val wrapper = wrapperFor(loi.getListElementObjectInspector)
(o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
case moi: MapObjectInspector =>
// The Predef.Map is scala.collection.immutable.Map.
// Since the map values can be mutable, we explicitly import scala.collection.Map at here.
import scala.collection.Map
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
(o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
keyWrapper(key) -> valueWrapper(value)
})
case _ =>
identity[Any]
}
/**
* Converts native catalyst types to the types expected by Hive
* @param a the value to be wrapped
......
......@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
import org.apache.hadoop.hive.common.`type`.HiveVarchar
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
......@@ -52,7 +52,7 @@ case class InsertIntoHiveTable(
child: SparkPlan,
overwrite: Boolean)
(@transient sc: HiveContext)
extends UnaryNode with Command {
extends UnaryNode with Command with HiveInspectors {
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
......@@ -68,46 +68,6 @@ case class InsertIntoHiveTable(
def output = child.output
/**
* Wraps with Hive types based on object inspector.
* TODO: Consolidate all hive OI/data interface code.
*/
protected def wrapperFor(oi: ObjectInspector): Any => Any = oi match {
case _: JavaHiveVarcharObjectInspector =>
(o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)
case _: JavaHiveDecimalObjectInspector =>
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[Decimal].toBigDecimal.underlying())
case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
(o: Any) => {
val struct = soi.create()
(soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row]).zipped.foreach {
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
}
case loi: ListObjectInspector =>
val wrapper = wrapperFor(loi.getListElementObjectInspector)
(o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))
case moi: MapObjectInspector =>
// The Predef.Map is scala.collection.immutable.Map.
// Since the map values can be mutable, we explicitly import scala.collection.Map at here.
import scala.collection.Map
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
(o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
keyWrapper(key) -> valueWrapper(value)
})
case _ =>
identity[Any]
}
def saveAsHiveFile(
rdd: RDD[Row],
valueClass: Class[_],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment