Skip to content
Snippets Groups Projects
Commit e5226e30 authored by Rajesh Balamohan's avatar Rajesh Balamohan Committed by Reynold Xin
Browse files

[SPARK-14551][SQL] Reduce number of NameNode calls in OrcRelation

## What changes were proposed in this pull request?
When FileSourceStrategy is used, record reader is created which incurs a NN call internally. Later in OrcRelation.unwrapOrcStructs, it ends ups reading the file information to get the ObjectInspector. This incurs additional NN call. It would be good to avoid this additional NN call (specifically for partitioned datasets).

Added OrcRecordReader which is very similar to OrcNewInputFormat.OrcRecordReader with an option of exposing the ObjectInspector. This eliminates the need to look up the file later for generating the object inspector. This would be specifically be useful for partitioned tables/datasets.

## How was this patch tested?
Ran tpc-ds queries manually and also verified by running org.apache.spark.sql.hive.orc.OrcSuite,org.apache.spark.sql.hive.orc.OrcQuerySuite,org.apache.spark.sql.hive.orc.OrcPartitionDiscoverySuite,OrcPartitionDiscoverySuite.OrcHadoopFsRelationSuite,org.apache.spark.sql.hive.execution.HiveCompatibilitySuite

…SourceStrategy mode

Author: Rajesh Balamohan <rbalamohan@apache.org>

Closes #12319 from rajeshbalamohan/SPARK-14551.
parent 95faa731
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.io.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.List;
/**
* This is based on hive-exec-1.2.1
* {@link org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat.OrcRecordReader}.
* This class exposes getObjectInspector which can be used for reducing
* NameNode calls in OrcRelation.
*/
public class SparkOrcNewRecordReader extends
org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
private final int numColumns;
OrcStruct value;
private float progress = 0.0f;
private ObjectInspector objectInspector;
public SparkOrcNewRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
List<OrcProto.Type> types = file.getTypes();
numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
value = new OrcStruct(numColumns);
this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset,
length);
this.objectInspector = file.getObjectInspector();
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public OrcStruct getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return progress;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader.hasNext()) {
reader.next(value);
progress = reader.getProgress();
return true;
} else {
return false;
}
}
public ObjectInspector getObjectInspector() {
return objectInspector;
}
}
......@@ -31,7 +31,6 @@ import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
......@@ -145,20 +144,24 @@ private[sql] class DefaultSource
val job = Job.getInstance(conf)
FileInputFormat.setInputPaths(job, file.filePath)
val inputFormat = new OrcNewInputFormat
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)), file.start, file.length, Array.empty
)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
// Custom OrcRecordReader is used to get
// ObjectInspector during recordReader creation itself and can
// avoid NameNode call in unwrapOrcStructs per file.
// Specifically would be helpful for partitioned datasets.
val orcReader = OrcFile.createReader(
new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
}
// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
)
conf,
requiredSchema,
Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
new RecordReaderIterator[OrcStruct](orcRecordReader))
// Appends partition values
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
......@@ -322,10 +325,11 @@ private[orc] case class OrcTableScan(
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf))
OrcRelation.unwrapOrcStructs(
split.getPath.toString,
wrappedConf.value,
StructType.fromAttributes(attributes),
maybeStructOI,
writableIterator
)
}
......@@ -355,12 +359,11 @@ private[orc] object OrcRelation extends HiveInspectors {
)
def unwrapOrcStructs(
filePath: String,
conf: Configuration,
dataSchema: StructType,
maybeStructOI: Option[StructObjectInspector],
iterator: Iterator[Writable]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf))
val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
val unsafeProjection = UnsafeProjection.create(dataSchema)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment