From ec622eb7e1ffd0775c9ca4683d1032ca8d41654a Mon Sep 17 00:00:00 2001
From: Andrew Ray <ray.andrew@gmail.com>
Date: Fri, 18 Nov 2016 11:19:49 -0800
Subject: [PATCH] [SPARK-18457][SQL] ORC and other columnar formats using
 HiveShim read all columns when doing a simple count

## What changes were proposed in this pull request?

When reading zero columns (e.g., count(*)) from ORC or any other format that uses HiveShim, actually set the read column list to empty for Hive to use.

## How was this patch tested?

Query correctness is handled by existing unit tests. I'm happy to add more if anyone can point out some case that is not covered.

Reduction in data read can be verified in the UI when built with a recent version of Hadoop say:
```
build/mvn -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.0 -Phive -DskipTests clean package
```
However the default Hadoop 2.2 that is used for unit tests does not report actual bytes read and instead just full file sizes (see FileScanRDD.scala line 80). Therefore I don't think there is a good way to add a unit test for this.

I tested with the following setup using above build options
```
case class OrcData(intField: Long, stringField: String)
spark.range(1,1000000).map(i => OrcData(i, s"part-$i")).toDF().write.format("orc").save("orc_test")

sql(
      s"""CREATE EXTERNAL TABLE orc_test(
         |  intField LONG,
         |  stringField STRING
         |)
         |STORED AS ORC
         |LOCATION '${System.getProperty("user.dir") + "/orc_test"}'
       """.stripMargin)
```

## Results

query | Spark 2.0.2 | this PR
---|---|---
`sql("select count(*) from orc_test").collect`|4.4 MB|199.4 KB
`sql("select intField from orc_test").collect`|743.4 KB|743.4 KB
`sql("select * from orc_test").collect`|4.4 MB|4.4 MB

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #15898 from aray/sql-orc-no-col.

(cherry picked from commit 795e9fc9213cb9941ae131aadcafddb94bde5f74)
Signed-off-by: Reynold Xin <rxin@databricks.com>
---
 .../org/apache/spark/sql/hive/HiveShim.scala  |  6 ++---
 .../spark/sql/hive/orc/OrcQuerySuite.scala    | 25 ++++++++++++++++++-
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index 0d2a765a38..9e9894803c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -69,13 +69,13 @@ private[hive] object HiveShim {
   }
 
   /*
-   * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty
+   * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null
    */
   def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) {
-    if (ids != null && ids.nonEmpty) {
+    if (ids != null) {
       ColumnProjectionUtils.appendReadColumns(conf, ids.asJava)
     }
-    if (names != null && names.nonEmpty) {
+    if (names != null) {
       appendReadColumnNames(conf, names)
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index ecb5972984..a628977af2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.hive.orc
 import java.nio.charset.StandardCharsets
 import java.sql.Timestamp
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -577,4 +579,25 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       assert(spark.table(tableName).schema == schema.copy(fields = expectedFields))
     }
   }
+
+  test("Empty schema does not read data from ORC file") {
+    val data = Seq((1, 1), (2, 2))
+    withOrcFile(data) { path =>
+      val requestedSchema = StructType(Nil)
+      val conf = new Configuration()
+      val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get
+      OrcRelation.setRequiredColumns(conf, physicalSchema, requestedSchema)
+      val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf))
+      assert(maybeOrcReader.isDefined)
+      val orcRecordReader = new SparkOrcNewRecordReader(
+        maybeOrcReader.get, conf, 0, maybeOrcReader.get.getContentLength)
+
+      val recordsIterator = new RecordReaderIterator[OrcStruct](orcRecordReader)
+      try {
+        assert(recordsIterator.next().toString == "{null, null}")
+      } finally {
+        recordsIterator.close()
+      }
+    }
+  }
 }
-- 
GitLab