Skip to content
Snippets Groups Projects
Commit 4374a46b authored by Brian Cho's avatar Brian Cho Committed by Reynold Xin
Browse files

[SPARK-16162] Remove dead code OrcTableScan.

## What changes were proposed in this pull request?

SPARK-14535 removed all calls to class OrcTableScan. This removes the dead code.

## How was this patch tested?

Existing unit tests.

Author: Brian Cho <bcho@fb.com>

Closes #13869 from dafrista/clean-up-orctablescan.
parent f34b5c62
No related branches found
No related tags found
No related merge requests found
...@@ -27,12 +27,10 @@ import org.apache.hadoop.hive.ql.io.orc._ ...@@ -27,12 +27,10 @@ import org.apache.hadoop.hive.ql.io.orc._
import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
...@@ -260,69 +258,6 @@ private[orc] class OrcOutputWriter( ...@@ -260,69 +258,6 @@ private[orc] class OrcOutputWriter(
} }
} }
private[orc] case class OrcTableScan(
@transient sparkSession: SparkSession,
attributes: Seq[Attribute],
filters: Array[Filter],
@transient inputPaths: Seq[FileStatus])
extends Logging
with HiveInspectors {
def execute(): RDD[InternalRow] = {
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val conf = job.getConfiguration
// Figure out the actual schema from the ORC source (without partition columns) so that we
// can pick the correct ordinals. Note that this assumes that all files have the same schema.
val orcFormat = new OrcFileFormat
val dataSchema =
orcFormat
.inferSchema(sparkSession, Map.empty, inputPaths)
.getOrElse(sys.error("Failed to read schema from target ORC files."))
// Tries to push down filters if ORC filter push-down is enabled
if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(dataSchema, filters).foreach { f =>
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}
// Sets requested columns
OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes))
if (inputPaths.isEmpty) {
// the input path probably be pruned, return an empty RDD.
return sparkSession.sparkContext.emptyRDD[InternalRow]
}
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
val inputFormatClass =
classOf[OrcInputFormat]
.asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]]
val rdd = sparkSession.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf],
inputFormatClass,
classOf[NullWritable],
classOf[Writable]
).asInstanceOf[HadoopRDD[NullWritable, Writable]]
val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf))
OrcRelation.unwrapOrcStructs(
wrappedConf.value,
StructType.fromAttributes(attributes),
maybeStructOI,
writableIterator
)
}
}
}
private[orc] object OrcTableScan { private[orc] object OrcTableScan {
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
private[orc] val SARG_PUSHDOWN = "sarg.pushdown" private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment