diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 4eeca9aec12bd550a2e93893ea60743c47bb98f2..7e89109259955f36135023bac377531262ae13ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -25,9 +25,9 @@ import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit} -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct} +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector +import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo} import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat @@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter( TypeInfoUtils.getTypeInfoFromTypeString( HiveMetastoreTypes.toMetastoreType(dataSchema)) - TypeInfoUtils - .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) - .asInstanceOf[StructObjectInspector] + OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) + .asInstanceOf[SettableStructObjectInspector] } - // Used to hold temporary `Writable` fields of the next row to be written. - private val reusableOutputBuffer = new Array[Any](dataSchema.length) - - // Used to convert Catalyst values into Hadoop `Writable`s. - private val wrappers = structOI.getAllStructFieldRefs.asScala - .zip(dataSchema.fields.map(_.dataType)) - .map { case (ref, dt) => - wrapperFor(ref.getFieldObjectInspector, dt) - }.toArray - // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // flag to decide whether `OrcRecordWriter.close()` needs to be called. private var recordWriterInstantiated = false @@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - override protected[sql] def writeInternal(row: InternalRow): Unit = { + private def wrapOrcStruct( + struct: OrcStruct, + oi: SettableStructObjectInspector, + row: InternalRow): Unit = { + val fieldRefs = oi.getAllStructFieldRefs var i = 0 - while (i < row.numFields) { - reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType)) + while (i < fieldRefs.size) { + oi.setStructFieldData( + struct, + fieldRefs.get(i), + wrap( + row.get(i, dataSchema(i).dataType), + fieldRefs.get(i).getFieldObjectInspector, + dataSchema(i).dataType)) i += 1 } + } + + val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] + + override protected[sql] def writeInternal(row: InternalRow): Unit = { + wrapOrcStruct(cachedOrcStruct, structOI, row) recordWriter.write( NullWritable.get(), - serializer.serialize(reusableOutputBuffer, structOI)) + serializer.serialize(cachedOrcStruct, structOI)) } override def close(): Unit = { @@ -259,7 +264,7 @@ private[orc] case class OrcTableScan( maybeStructOI.map { soi => val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => - soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal + soi.getStructFieldRef(attr.name) -> ordinal }.unzip val unwrappers = fieldRefs.map(unwrapperFor) // Map each tuple to a row object diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 744d46293814183ed87d17afffac30f05b28545b..8bc33fcf5d9065b1ef4f00a6e278891dcb3d4921 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("SPARK-9170: Don't implicitly lowercase of user-provided columns") { + withTempPath { dir => + val path = dir.getCanonicalPath + + sqlContext.range(0, 10).select('id as "Acol").write.format("orc").save(path) + sqlContext.read.format("orc").load(path).schema("Acol") + intercept[IllegalArgumentException] { + sqlContext.read.format("orc").load(path).schema("acol") + } + checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"), + (0 until 10).map(Row(_))) + } + } + test("SPARK-8501: Avoids discovery schema from empty ORC files") { withTempPath { dir => val path = dir.getCanonicalPath