From aa494a9c2ebd59baec47beb434cd09bf3f188218 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Sat, 31 Oct 2015 21:16:09 -0700
Subject: [PATCH] [SPARK-11117] [SPARK-11345] [SQL] Makes all HadoopFsRelation
 data sources produce UnsafeRow

This PR fixes two issues:

1.  `PhysicalRDD.outputsUnsafeRows` is always `false`

    Thus a `ConvertToUnsafe` operator is often required even if the underlying data source relation does output `UnsafeRow`.

1.  Internal/external row conversion for `HadoopFsRelation` is kinda messy

    Currently we're using `HadoopFsRelation.needConversion` and [dirty type erasure hacks][1] to indicate whether the relation outputs external row or internal row and apply external-to-internal conversion when necessary.  Basically, all builtin `HadoopFsRelation` data sources, i.e. Parquet, JSON, ORC, and Text output `InternalRow`, while typical external `HadoopFsRelation` data sources, e.g. spark-avro and spark-csv, output `Row`.

This PR adds a `private[sql]` interface method `HadoopFsRelation.buildInternalScan`, which by default invokes `HadoopFsRelation.buildScan` and converts `Row`s to `UnsafeRow`s (which are also `InternalRow`s).  All builtin `HadoopFsRelation` data sources override this method and directly output `UnsafeRow`s.  In this way, now `HadoopFsRelation` always produces `UnsafeRow`s. Thus `PhysicalRDD.outputsUnsafeRows` can be properly set by checking whether the underlying data source is a `HadoopFsRelation`.

A remaining question is that, can we assume that all non-builtin `HadoopFsRelation` data sources output external rows?  At least all well known ones do so.  However it's possible that some users implemented their own `HadoopFsRelation` data sources that leverages `InternalRow` and thus all those unstable internal data representations.  If this assumption is safe, we can deprecate `HadoopFsRelation.needConversion` and cleanup some more conversion code (like [here][2] and [here][3]).

This PR supersedes #9125.

Follow-ups:

1.  Makes JSON and ORC data sources output `UnsafeRow` directly

1.  Makes `HiveTableScan` output `UnsafeRow` directly

    This is related to 1 since ORC data source shares the same `Writable` unwrapping code with `HiveTableScan`.

[1]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala#L353
[2]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L331-L335
[3]: https://github.com/apache/spark/blob/v1.5.1/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L630-L669

Author: Cheng Lian <lian@databricks.com>

Closes #9305 from liancheng/spark-11345.unsafe-hadoop-fs-relation.
---
 .../sql/columnar/GenerateColumnAccessor.scala |  2 +-
 .../spark/sql/execution/ExistingRDD.scala     |  8 ++--
 .../datasources/DataSourceStrategy.scala      | 30 ++++++++-----
 .../datasources/json/JSONRelation.scala       | 18 +++++---
 .../parquet/CatalystRecordMaterializer.scala  |  2 +-
 .../parquet/CatalystRowConverter.scala        |  8 +++-
 .../datasources/parquet/ParquetRelation.scala |  6 +--
 .../datasources/text/DefaultSource.scala      | 34 +++++++++-----
 .../apache/spark/sql/sources/interfaces.scala | 44 +++++++++++++++++--
 .../parquet/ParquetQuerySuite.scala           |  2 +-
 .../spark/sql/hive/orc/OrcRelation.scala      | 30 ++++++-------
 .../sql/sources/hadoopFsRelationSuites.scala  | 31 +++++++++++++
 12 files changed, 156 insertions(+), 59 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
index 7980a6f36d..ff9393b465 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala
@@ -34,7 +34,7 @@ abstract class ColumnarIterator extends Iterator[InternalRow] {
 /**
  * An helper class to update the fields of UnsafeRow, used by ColumnAccessor
  *
- * WARNNING: These setter MUST be called in increasing order of ordinals.
+ * WARNING: These setter MUST be called in increasing order of ordinals.
  */
 class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) {
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 87bd92e00a..7a466cf6a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation}
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{Row, SQLContext}
 
@@ -93,7 +93,9 @@ private[sql] case class LogicalRDD(
 private[sql] case class PhysicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
-    extraInformation: String) extends LeafNode {
+    extraInformation: String,
+    override val outputsUnsafeRows: Boolean = false)
+  extends LeafNode {
 
   protected override def doExecute(): RDD[InternalRow] = rdd
 
@@ -105,7 +107,7 @@ private[sql] object PhysicalRDD {
       output: Seq[Attribute],
       rdd: RDD[InternalRow],
       relation: BaseRelation): PhysicalRDD = {
-    PhysicalRDD(output, rdd, relation.toString)
+    PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation])
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index af6626c897..65859865c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,21 +17,21 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.spark.{Logging, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, TaskContext}
 
 /**
  * A Strategy for planning scans over data sources defined using the sources API.
@@ -106,8 +106,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         l,
         projects,
         filters,
-        (a, f) =>
-          toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
+        (a, f) => t.buildInternalScan(a.map(_.name).toArray, f, t.paths, confBroadcast)) :: Nil
 
     case l @ LogicalRelation(baseRelation: TableScan, _) =>
       execution.PhysicalRDD.createFromDataSource(
@@ -152,7 +151,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
           // Don't scan any partition columns to save I/O.  Here we are being optimistic and
           // assuming partition columns data stored in data files are always consistent with those
           // partition values encoded in partition directory paths.
-          val dataRows = relation.buildScan(
+          val dataRows = relation.buildInternalScan(
             requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)
 
           // Merges data values with partition values.
@@ -161,7 +160,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
             requiredDataColumns,
             partitionColumns,
             partitionValues,
-            toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
+            dataRows)
         }
 
         val unionedRows =
@@ -199,15 +198,24 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       // Builds `AttributeReference`s for all partition columns so that we can use them to project
       // required partition columns.  Note that if a partition column appears in `requiredColumns`,
       // we should use the `AttributeReference` in `requiredColumns`.
-      val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
-      val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
-        requiredColumnMap.getOrElse(a.name, a)
+      val partitionColumns = {
+        val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
+        partitionColumnSchema.toAttributes.map { a =>
+          requiredColumnMap.getOrElse(a.name, a)
+        }
       }
 
       val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
-        val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+        // Note that we can't use an `UnsafeRowJoiner` to replace the following `JoinedRow` and
+        // `UnsafeProjection`.  Because the projection may also adjust column order.
         val mutableJoinedRow = new JoinedRow()
-        iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
+        val unsafePartitionValues = UnsafeProjection.create(partitionColumnSchema)(partitionValues)
+        val unsafeProjection =
+          UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
+
+        iterator.map { unsafeDataRow =>
+          unsafeProjection(mutableJoinedRow(unsafeDataRow, unsafePartitionValues))
+        }
       }
 
       // This is an internal RDD whose call site the user should not be concerned with
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 5f104fca7d..85b52f04c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
 import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
@@ -122,14 +123,21 @@ private[sql] class JSONRelation(
     jsonSchema
   }
 
-  override def buildScan(
+  override private[sql] def buildInternalScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputPaths: Array[FileStatus]): RDD[Row] = {
-    JacksonParser(
+      inputPaths: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
+    val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
+    val rows = JacksonParser(
       inputRDD.getOrElse(createBaseRdd(inputPaths)),
-      StructType(requiredColumns.map(dataSchema(_))),
-      sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
+      requiredDataSchema,
+      sqlContext.conf.columnNameOfCorruptRecord)
+
+    rows.mapPartitions { iterator =>
+      val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
+      iterator.map(unsafeProjection)
+    }
   }
 
   override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
index ed9e0aa659..eeead9f5d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRecordMaterializer.scala
@@ -35,7 +35,7 @@ private[parquet] class CatalystRecordMaterializer(
 
   private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
 
-  override def getCurrentRecord: InternalRow = rootConverter.currentRow
+  override def getCurrentRecord: InternalRow = rootConverter.currentRecord
 
   override def getRootConverter: GroupConverter = rootConverter
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index b16c46579f..1f653cd3d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -163,10 +163,14 @@ private[parquet] class CatalystRowConverter(
     override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
   }
 
+  private val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+
+  private val unsafeProjection = UnsafeProjection.create(catalystType)
+
   /**
-   * Represents the converted row object once an entire Parquet record is converted.
+   * The [[UnsafeRow]] converted from an entire Parquet record.
    */
-  val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
+  def currentRecord: UnsafeRow = unsafeProjection(currentRow)
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 44649a68b3..5a7c6b95b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -282,11 +282,11 @@ private[sql] class ParquetRelation(
     }
   }
 
-  override def buildScan(
+  override def buildInternalScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputFiles: Array[FileStatus],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
     val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
@@ -361,7 +361,7 @@ private[sql] class ParquetRelation(
               id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable])
           }
         }
-      }.asInstanceOf[RDD[Row]]  // type erasure hack to pass RDD[InternalRow] as RDD[Row]
+      }
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index ab26c57ad1..52c4421d7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -25,16 +25,20 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
 import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, GenericMutableRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, BufferHolder}
+import org.apache.spark.sql.columnar.MutableUnsafeRow
 import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
 import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * A data source for reading text files.
@@ -79,8 +83,12 @@ private[sql] class TextRelation(
   /** This is an internal data source that outputs internal row format. */
   override val needConversion: Boolean = false
 
-  /** Read path. */
-  override def buildScan(inputPaths: Array[FileStatus]): RDD[Row] = {
+
+  override private[sql] def buildInternalScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputPaths: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
     val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
     val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
     val paths = inputPaths.map(_.getPath).sortBy(_.toUri)
@@ -92,17 +100,19 @@ private[sql] class TextRelation(
     sqlContext.sparkContext.hadoopRDD(
       conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
       .mapPartitions { iter =>
-        var buffer = new Array[Byte](1024)
-        val row = new GenericMutableRow(1)
+        val bufferHolder = new BufferHolder
+        val unsafeRowWriter = new UnsafeRowWriter
+        val unsafeRow = new UnsafeRow
+
         iter.map { case (_, line) =>
-          if (line.getLength > buffer.length) {
-            buffer = new Array[Byte](line.getLength)
-          }
-          System.arraycopy(line.getBytes, 0, buffer, 0, line.getLength)
-          row.update(0, UTF8String.fromBytes(buffer, 0, line.getLength))
-          row
+          // Writes to an UnsafeRow directly
+          bufferHolder.reset()
+          unsafeRowWriter.initialize(bufferHolder, 1)
+          unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+          unsafeRow.pointTo(bufferHolder.buffer, 1, bufferHolder.totalSize())
+          unsafeRow
         }
-      }.asInstanceOf[RDD[Row]]
+      }
   }
 
   /** Write path. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index a9a013e936..7a55351148 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -585,11 +585,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     })
   }
 
-  final private[sql] def buildScan(
+  final private[sql] def buildInternalScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputPaths: Array[String],
-      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
     val inputStatuses = inputPaths.flatMap { input =>
       val path = new Path(input)
 
@@ -604,7 +604,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
       }
     }
 
-    buildScan(requiredColumns, filters, inputStatuses, broadcastedConf)
+    buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf)
   }
 
   /**
@@ -740,6 +740,44 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     buildScan(requiredColumns, filters, inputFiles)
   }
 
+  /**
+   * For a non-partitioned relation, this method builds an `RDD[InternalRow]` containing all rows
+   * within this relation. For partitioned relations, this method is called for each selected
+   * partition, and builds an `RDD[InternalRow]` containing all rows within that single partition.
+   *
+   * Note:
+   *
+   * 1. Rows contained in the returned `RDD[InternalRow]` are assumed to be `UnsafeRow`s.
+   * 2. This interface is subject to change in future.
+   *
+   * @param requiredColumns Required columns.
+   * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction
+   *        of all `filters`.  The pushed down filters are currently purely an optimization as they
+   *        will all be evaluated again. This means it is safe to use them with methods that produce
+   *        false positives such as filtering partitions based on a bloom filter.
+   * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the
+   *        relation. For a partitioned relation, it contains paths of all data files in a single
+   *        selected partition.
+   * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the
+   *        overhead of broadcasting the Configuration for every Hadoop RDD.
+   */
+  private[sql] def buildInternalScan(
+      requiredColumns: Array[String],
+      filters: Array[Filter],
+      inputFiles: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
+    val requiredSchema = StructType(requiredColumns.map(dataSchema.apply))
+    val internalRows = {
+      val externalRows = buildScan(requiredColumns, filters, inputFiles, broadcastedConf)
+      execution.RDDConversions.rowToRowRdd(externalRows, requiredSchema.map(_.dataType))
+    }
+
+    internalRows.mapPartitions { iterator =>
+      val unsafeProjection = UnsafeProjection.create(requiredSchema)
+      iterator.map(unsafeProjection)
+    }
+  }
+
   /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation can
    * be put here.  For example, user defined output committer can be configured here
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index baff7f5752..70fae32b7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,8 +22,8 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index d1f30e188e..45de567039 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -19,21 +19,20 @@ package org.apache.spark.sql.hive.orc
 
 import java.util.Properties
 
-import scala.collection.JavaConverters._
-
 import com.google.common.base.Objects
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
 import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
-import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
+import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
 import org.apache.hadoop.io.{NullWritable, Writable}
 import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.Logging
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -199,12 +198,13 @@ private[sql] class OrcRelation(
       partitionColumns)
   }
 
-  override def buildScan(
+  override private[sql] def buildInternalScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
-      inputPaths: Array[FileStatus]): RDD[Row] = {
+      inputPaths: Array[FileStatus],
+      broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
     val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
-    OrcTableScan(output, this, filters, inputPaths).execute().asInstanceOf[RDD[Row]]
+    OrcTableScan(output, this, filters, inputPaths).execute()
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
@@ -253,16 +253,17 @@ private[orc] case class OrcTableScan(
       path: String,
       conf: Configuration,
       iterator: Iterator[Writable],
-      nonPartitionKeyAttrs: Seq[(Attribute, Int)],
-      mutableRow: MutableRow): Iterator[InternalRow] = {
+      nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
     val deserializer = new OrcSerde
     val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+    val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+    val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
 
     // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
     // rows, and thus couldn't give a proper ObjectInspector.  In this case we just return an empty
     // partition since we know that this file is empty.
     maybeStructOI.map { soi =>
-      val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+      val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
         case (attr, ordinal) =>
           soi.getStructFieldRef(attr.name) -> ordinal
       }.unzip
@@ -280,7 +281,7 @@ private[orc] case class OrcTableScan(
           }
           i += 1
         }
-        mutableRow: InternalRow
+        unsafeProjection(mutableRow)
       }
     }.getOrElse {
       Iterator.empty
@@ -322,13 +323,8 @@ private[orc] case class OrcTableScan(
     val wrappedConf = new SerializableConfiguration(conf)
 
     rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
-      val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
-      fillObject(
-        split.getPath.toString,
-        wrappedConf.value,
-        iterator.map(_._2),
-        attributes.zipWithIndex,
-        mutableRow)
+      val writableIterator = iterator.map(_._2)
+      fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
     }
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index e3605bb3f6..100b97137c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ConvertToUnsafe
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -687,6 +688,36 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
     }
   }
+
+  test("HadoopFsRelation produces UnsafeRow") {
+    withTempTable("test_unsafe") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        sqlContext.range(3).write.format(dataSourceName).save(path)
+        sqlContext.read
+          .format(dataSourceName)
+          .option("dataSchema", new StructType().add("id", LongType, nullable = false).json)
+          .load(path)
+          .registerTempTable("test_unsafe")
+
+        val df = sqlContext.sql(
+          """SELECT COUNT(*)
+            |FROM test_unsafe a JOIN test_unsafe b
+            |WHERE a.id = b.id
+          """.stripMargin)
+
+        val plan = df.queryExecution.executedPlan
+
+        assert(
+          plan.collect { case plan: ConvertToUnsafe => plan }.isEmpty,
+          s"""Query plan shouldn't have ${classOf[ConvertToUnsafe].getSimpleName} node(s):
+             |$plan
+           """.stripMargin)
+
+        checkAnswer(df, Row(3))
+      }
+    }
+  }
 }
 
 // This class is used to test SPARK-8578. We should not use any custom output committer when
-- 
GitLab