From 1fa644497aed0a6d22f5fc7bf8e752508053b75b Mon Sep 17 00:00:00 2001
From: Sean Zhong <seanzhong@databricks.com>
Date: Fri, 5 Aug 2016 11:19:20 +0800
Subject: [PATCH] [SPARK-16907][SQL] Fix performance regression for parquet
 table when vectorized parquet record reader is not being used

## What changes were proposed in this pull request?

For non-partitioned parquet table, if the vectorized parquet record reader is not being used, Spark 2.0 adds an extra unnecessary memory copy to append partition values for each row.

There are several typical cases that vectorized parquet record reader is not being used:
1. When the table schema is not flat, like containing nested fields.
2. When `spark.sql.parquet.enableVectorizedReader = false`

By fixing this bug, we get about 20% - 30% performance gain in test case like this:

```
// Generates parquet table with nested columns
spark.range(100000000).select(struct($"id").as("nc")).write.parquet("/tmp/data4")

def time[R](block: => R): Long = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    println("Elapsed time: " + (t1 - t0)/1000000 + "ms")
    (t1 - t0)/1000000
}

val x = ((0 until 20).toList.map(x => time(spark.read.parquet("/tmp/data4").filter($"nc.id" < 100).collect()))).sum/20
```

## How was this patch tested?

After a few times warm up, we get 26% performance improvement

Before fix:
```
Average: 4584ms, raw data (10 tries): 4726ms 4509ms 4454ms 4879ms 4586ms 4733ms 4500ms 4361ms 4456ms 4640ms
```

After fix:
```
Average: 3614ms, raw data(10 tries): 3554ms 3740ms 4019ms 3439ms 3460ms 3664ms 3557ms 3584ms 3612ms 3531ms
```

Test env: Intel(R) Core(TM) i7-6700 CPU  3.40GHz, Intel SSD SC2KW24

Author: Sean Zhong <seanzhong@databricks.com>

Closes #14445 from clockfly/fix_parquet_regression_2.
---
 .../execution/datasources/parquet/ParquetFileFormat.scala | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c3e75f1934..ea32506c09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -368,6 +368,7 @@ private[sql] class ParquetFileFormat
         vectorizedReader
       } else {
         logDebug(s"Falling back to parquet-mr")
+        // ParquetRecordReader returns UnsafeRow
         val reader = pushed match {
           case Some(filter) =>
             new ParquetRecordReader[UnsafeRow](
@@ -394,8 +395,13 @@ private[sql] class ParquetFileFormat
         // This is a horrible erasure hack...  if we type the iterator above, then it actually check
         // the type in next() and we get a class cast exception.  If we make that function return
         // Object, then we can defer the cast until later!
-        iter.asInstanceOf[Iterator[InternalRow]]
+        if (partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } else {
+          iter.asInstanceOf[Iterator[InternalRow]]
             .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+        }
       }
     }
   }
-- 
GitLab