Skip to content
Snippets Groups Projects
Commit 990c9f79 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Cheng Lian
Browse files

[SPARK-9170] [SQL] Use OrcStructInspector to be case preserving when writing ORC files

JIRA: https://issues.apache.org/jira/browse/SPARK-9170

`StandardStructObjectInspector` will implicitly lowercase column names. But I think Orc format doesn't have such requirement. In fact, there is a `OrcStructInspector` specified for Orc format. We should use it when serialize rows to Orc file. It can be case preserving when writing ORC files.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #7520 from viirya/use_orcstruct.
parent 6ceed852
No related branches found
No related tags found
No related merge requests found
...@@ -25,9 +25,9 @@ import com.google.common.base.Objects ...@@ -25,9 +25,9 @@ import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit} import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
...@@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter( ...@@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter(
TypeInfoUtils.getTypeInfoFromTypeString( TypeInfoUtils.getTypeInfoFromTypeString(
HiveMetastoreTypes.toMetastoreType(dataSchema)) HiveMetastoreTypes.toMetastoreType(dataSchema))
TypeInfoUtils OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
.getStandardJavaObjectInspectorFromTypeInfo(typeInfo) .asInstanceOf[SettableStructObjectInspector]
.asInstanceOf[StructObjectInspector]
} }
// Used to hold temporary `Writable` fields of the next row to be written.
private val reusableOutputBuffer = new Array[Any](dataSchema.length)
// Used to convert Catalyst values into Hadoop `Writable`s.
private val wrappers = structOI.getAllStructFieldRefs.asScala
.zip(dataSchema.fields.map(_.dataType))
.map { case (ref, dt) =>
wrapperFor(ref.getFieldObjectInspector, dt)
}.toArray
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
// flag to decide whether `OrcRecordWriter.close()` needs to be called. // flag to decide whether `OrcRecordWriter.close()` needs to be called.
private var recordWriterInstantiated = false private var recordWriterInstantiated = false
...@@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter( ...@@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter(
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
override protected[sql] def writeInternal(row: InternalRow): Unit = { private def wrapOrcStruct(
struct: OrcStruct,
oi: SettableStructObjectInspector,
row: InternalRow): Unit = {
val fieldRefs = oi.getAllStructFieldRefs
var i = 0 var i = 0
while (i < row.numFields) { while (i < fieldRefs.size) {
reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType)) oi.setStructFieldData(
struct,
fieldRefs.get(i),
wrap(
row.get(i, dataSchema(i).dataType),
fieldRefs.get(i).getFieldObjectInspector,
dataSchema(i).dataType))
i += 1 i += 1
} }
}
val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
override protected[sql] def writeInternal(row: InternalRow): Unit = {
wrapOrcStruct(cachedOrcStruct, structOI, row)
recordWriter.write( recordWriter.write(
NullWritable.get(), NullWritable.get(),
serializer.serialize(reusableOutputBuffer, structOI)) serializer.serialize(cachedOrcStruct, structOI))
} }
override def close(): Unit = { override def close(): Unit = {
...@@ -259,7 +264,7 @@ private[orc] case class OrcTableScan( ...@@ -259,7 +264,7 @@ private[orc] case class OrcTableScan(
maybeStructOI.map { soi => maybeStructOI.map { soi =>
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
case (attr, ordinal) => case (attr, ordinal) =>
soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal soi.getStructFieldRef(attr.name) -> ordinal
}.unzip }.unzip
val unwrappers = fieldRefs.map(unwrapperFor) val unwrappers = fieldRefs.map(unwrapperFor)
// Map each tuple to a row object // Map each tuple to a row object
......
...@@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { ...@@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
} }
} }
test("SPARK-9170: Don't implicitly lowercase of user-provided columns") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext.range(0, 10).select('id as "Acol").write.format("orc").save(path)
sqlContext.read.format("orc").load(path).schema("Acol")
intercept[IllegalArgumentException] {
sqlContext.read.format("orc").load(path).schema("acol")
}
checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"),
(0 until 10).map(Row(_)))
}
}
test("SPARK-8501: Avoids discovery schema from empty ORC files") { test("SPARK-8501: Avoids discovery schema from empty ORC files") {
withTempPath { dir => withTempPath { dir =>
val path = dir.getCanonicalPath val path = dir.getCanonicalPath
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment