Skip to content
Snippets Groups Projects
Commit c18fa464 authored by Ergin Seyfe's avatar Ergin Seyfe Committed by Yin Huai
Browse files

[SPARK-15280] Input/Output] Refactored OrcOutputWriter and moved serialization to a new class.

## What changes were proposed in this pull request?
Refactoring: Separated ORC serialization logic from OrcOutputWriter and moved to a new class called OrcSerializer.

## How was this patch tested?
Manual tests & existing tests.

Author: Ergin Seyfe <eseyfe@fb.com>

Closes #13066 from seyfe/orc_serializer.
parent 201a51f3
No related branches found
No related tags found
No related merge requests found
......@@ -149,39 +149,70 @@ private[sql] class DefaultSource
}
}
private[orc] class OrcOutputWriter(
path: String,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter with HiveInspectors {
private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
extends HiveInspectors {
def serialize(row: InternalRow): Writable = {
wrapOrcStruct(cachedOrcStruct, structOI, row)
serializer.serialize(cachedOrcStruct, structOI)
}
private val serializer = {
private[this] val serializer = {
val table = new Properties()
table.setProperty("columns", dataSchema.fieldNames.mkString(","))
table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":"))
val serde = new OrcSerde
val configuration = context.getConfiguration
serde.initialize(configuration, table)
serde.initialize(conf, table)
serde
}
// Object inspector converted from the schema of the relation to be written.
private val structOI = {
// Object inspector converted from the schema of the relation to be serialized.
private[this] val structOI = {
val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
.asInstanceOf[SettableStructObjectInspector]
}
private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
private[this] def wrapOrcStruct(
struct: OrcStruct,
oi: SettableStructObjectInspector,
row: InternalRow): Unit = {
val fieldRefs = oi.getAllStructFieldRefs
var i = 0
while (i < fieldRefs.size) {
oi.setStructFieldData(
struct,
fieldRefs.get(i),
wrap(
row.get(i, dataSchema(i).dataType),
fieldRefs.get(i).getFieldObjectInspector,
dataSchema(i).dataType))
i += 1
}
}
}
private[orc] class OrcOutputWriter(
path: String,
bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
private[this] val conf = context.getConfiguration
private[this] val serializer = new OrcSerializer(dataSchema, conf)
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
// flag to decide whether `OrcRecordWriter.close()` needs to be called.
private var recordWriterInstantiated = false
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
......@@ -206,33 +237,8 @@ private[orc] class OrcOutputWriter(
override def write(row: Row): Unit =
throw new UnsupportedOperationException("call writeInternal")
private def wrapOrcStruct(
struct: OrcStruct,
oi: SettableStructObjectInspector,
row: InternalRow): Unit = {
val fieldRefs = oi.getAllStructFieldRefs
var i = 0
while (i < fieldRefs.size) {
oi.setStructFieldData(
struct,
fieldRefs.get(i),
wrap(
row.get(i, dataSchema(i).dataType),
fieldRefs.get(i).getFieldObjectInspector,
dataSchema(i).dataType))
i += 1
}
}
val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
override protected[sql] def writeInternal(row: InternalRow): Unit = {
wrapOrcStruct(cachedOrcStruct, structOI, row)
recordWriter.write(
NullWritable.get(),
serializer.serialize(cachedOrcStruct, structOI))
recordWriter.write(NullWritable.get(), serializer.serialize(row))
}
override def close(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment