From 93bb0b911b6c790fa369b39da51a83d8f62da909 Mon Sep 17 00:00:00 2001
From: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Date: Sun, 26 Mar 2017 09:20:22 +0200
Subject: [PATCH] [SPARK-20046][SQL] Facilitate loop optimizations in a JIT
 compiler regarding sqlContext.read.parquet()

## What changes were proposed in this pull request?

This PR improves performance of operations with `sqlContext.read.parquet()` by changing Java code generated by Catalyst. This PR is inspired by [the blog article](https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html) and [this stackoverflow entry](http://stackoverflow.com/questions/40629435/fast-parquet-row-count-in-spark).

This PR changes generated code in the following two points.
1. Replace a while-loop with long instance variables a for-loop with int local variables
2. Suppress generation of `shouldStop()` method if this method is unnecessary (e.g. `append()` is not generated).

These points facilitates compiler optimizations in a JIT compiler by feeding the simplified Java code into the JIT compiler. The performance of `sqlContext.read.parquet().count` is improved by 1.09x.

Benchmark program:
```java
val dir = "/dev/shm/parquet"
val N = 1000 * 1000 * 40
val iters = 20
val benchmark = new Benchmark("Parquet", N * iters, minNumIters = 5, warmupTime = 30.seconds)
sparkSession.range(n).write.mode("overwrite").parquet(dir)

benchmark.addCase("count") { i: Int =>
  var n = 0
  var len = 0L
  while (n < iters) {
    len += sparkSession.read.parquet(dir).count
    n += 1
  }
}
benchmark.run
```

Performance result without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o this PR                                   1152 / 1211        694.7           1.4       1.0X
```

Performance result with this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Parquet:                                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
with this PR                                  1053 / 1121        760.0           1.3       1.0X
```

Here is a comparison between generated code w/o and with this PR. Only the method ```agg_doAggregateWithoutKey``` is changed.

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */   private UnsafeRow agg_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     agg_initAgg = false;
/* 031 */
/* 032 */     scan_input = inputs[0];
/* 033 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */     scan_scanTime1 = 0;
/* 036 */     scan_batch = null;
/* 037 */     scan_batchIdx = 0;
/* 038 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */     agg_result = new UnsafeRow(1);
/* 041 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 047 */     // initialize aggregation buffer
/* 048 */     agg_bufIsNull = false;
/* 049 */     agg_bufValue = 0L;
/* 050 */
/* 051 */     if (scan_batch == null) {
/* 052 */       scan_nextBatch();
/* 053 */     }
/* 054 */     while (scan_batch != null) {
/* 055 */       int numRows = scan_batch.numRows();
/* 056 */       while (scan_batchIdx < numRows) {
/* 057 */         int scan_rowIdx = scan_batchIdx++;
/* 058 */         // do aggregate
/* 059 */         // common sub-expressions
/* 060 */
/* 061 */         // evaluate aggregate function
/* 062 */         boolean agg_isNull1 = false;
/* 063 */
/* 064 */         long agg_value1 = -1L;
/* 065 */         agg_value1 = agg_bufValue + 1L;
/* 066 */         // update aggregation buffer
/* 067 */         agg_bufIsNull = false;
/* 068 */         agg_bufValue = agg_value1;
/* 069 */         if (shouldStop()) return;
/* 070 */       }
/* 071 */       scan_batch = null;
/* 072 */       scan_nextBatch();
/* 073 */     }
/* 074 */     scan_scanTime.add(scan_scanTime1 / (1000 * 1000));
/* 075 */     scan_scanTime1 = 0;
/* 076 */
/* 077 */   }
/* 078 */
/* 079 */   private void scan_nextBatch() throws java.io.IOException {
/* 080 */     long getBatchStart = System.nanoTime();
/* 081 */     if (scan_input.hasNext()) {
/* 082 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 083 */       scan_numOutputRows.add(scan_batch.numRows());
/* 084 */       scan_batchIdx = 0;
/* 085 */
/* 086 */     }
/* 087 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 088 */   }
/* 089 */
/* 090 */   protected void processNext() throws java.io.IOException {
/* 091 */     while (!agg_initAgg) {
/* 092 */       agg_initAgg = true;
/* 093 */       long agg_beforeAgg = System.nanoTime();
/* 094 */       agg_doAggregateWithoutKey();
/* 095 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 096 */
/* 097 */       // output the result
/* 098 */
/* 099 */       agg_numOutputRows.add(1);
/* 100 */       agg_rowWriter.zeroOutNullBytes();
/* 101 */
/* 102 */       if (agg_bufIsNull) {
/* 103 */         agg_rowWriter.setNullAt(0);
/* 104 */       } else {
/* 105 */         agg_rowWriter.write(0, agg_bufValue);
/* 106 */       }
/* 107 */       append(agg_result);
/* 108 */     }
/* 109 */   }
/* 110 */ }
```

Generated code with this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private boolean agg_initAgg;
/* 009 */   private boolean agg_bufIsNull;
/* 010 */   private long agg_bufValue;
/* 011 */   private scala.collection.Iterator scan_input;
/* 012 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_numOutputRows;
/* 013 */   private org.apache.spark.sql.execution.metric.SQLMetric scan_scanTime;
/* 014 */   private long scan_scanTime1;
/* 015 */   private org.apache.spark.sql.execution.vectorized.ColumnarBatch scan_batch;
/* 016 */   private int scan_batchIdx;
/* 017 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_numOutputRows;
/* 018 */   private org.apache.spark.sql.execution.metric.SQLMetric agg_aggTime;
/* 019 */   private UnsafeRow agg_result;
/* 020 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 022 */
/* 023 */   public GeneratedIterator(Object[] references) {
/* 024 */     this.references = references;
/* 025 */   }
/* 026 */
/* 027 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 028 */     partitionIndex = index;
/* 029 */     this.inputs = inputs;
/* 030 */     agg_initAgg = false;
/* 031 */
/* 032 */     scan_input = inputs[0];
/* 033 */     this.scan_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[0];
/* 034 */     this.scan_scanTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 035 */     scan_scanTime1 = 0;
/* 036 */     scan_batch = null;
/* 037 */     scan_batchIdx = 0;
/* 038 */     this.agg_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 039 */     this.agg_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 040 */     agg_result = new UnsafeRow(1);
/* 041 */     this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 0);
/* 042 */     this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 1);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   private void agg_doAggregateWithoutKey() throws java.io.IOException {
/* 047 */     // initialize aggregation buffer
/* 048 */     agg_bufIsNull = false;
/* 049 */     agg_bufValue = 0L;
/* 050 */
/* 051 */     if (scan_batch == null) {
/* 052 */       scan_nextBatch();
/* 053 */     }
/* 054 */     while (scan_batch != null) {
/* 055 */       int numRows = scan_batch.numRows();
/* 056 */       int scan_localEnd = numRows - scan_batchIdx;
/* 057 */       for (int scan_localIdx = 0; scan_localIdx < scan_localEnd; scan_localIdx++) {
/* 058 */         int scan_rowIdx = scan_batchIdx + scan_localIdx;
/* 059 */         // do aggregate
/* 060 */         // common sub-expressions
/* 061 */
/* 062 */         // evaluate aggregate function
/* 063 */         boolean agg_isNull1 = false;
/* 064 */
/* 065 */         long agg_value1 = -1L;
/* 066 */         agg_value1 = agg_bufValue + 1L;
/* 067 */         // update aggregation buffer
/* 068 */         agg_bufIsNull = false;
/* 069 */         agg_bufValue = agg_value1;
/* 070 */         // shouldStop check is eliminated
/* 071 */       }
/* 072 */       scan_batchIdx = numRows;
/* 073 */       scan_batch = null;
/* 074 */       scan_nextBatch();
/* 075 */     }
/* 079 */   }
/* 080 */
/* 081 */   private void scan_nextBatch() throws java.io.IOException {
/* 082 */     long getBatchStart = System.nanoTime();
/* 083 */     if (scan_input.hasNext()) {
/* 084 */       scan_batch = (org.apache.spark.sql.execution.vectorized.ColumnarBatch)scan_input.next();
/* 085 */       scan_numOutputRows.add(scan_batch.numRows());
/* 086 */       scan_batchIdx = 0;
/* 087 */
/* 088 */     }
/* 089 */     scan_scanTime1 += System.nanoTime() - getBatchStart;
/* 090 */   }
/* 091 */
/* 092 */   protected void processNext() throws java.io.IOException {
/* 093 */     while (!agg_initAgg) {
/* 094 */       agg_initAgg = true;
/* 095 */       long agg_beforeAgg = System.nanoTime();
/* 096 */       agg_doAggregateWithoutKey();
/* 097 */       agg_aggTime.add((System.nanoTime() - agg_beforeAgg) / 1000000);
/* 098 */
/* 099 */       // output the result
/* 100 */
/* 101 */       agg_numOutputRows.add(1);
/* 102 */       agg_rowWriter.zeroOutNullBytes();
/* 103 */
/* 104 */       if (agg_bufIsNull) {
/* 105 */         agg_rowWriter.setNullAt(0);
/* 106 */       } else {
/* 107 */         agg_rowWriter.write(0, agg_bufValue);
/* 108 */       }
/* 109 */       append(agg_result);
/* 110 */     }
/* 111 */   }
/* 112 */ }
```

## How was this patch tested?

Tested existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #17378 from kiszk/SPARK-20046.
---
 .../sql/execution/ColumnarBatchScan.scala      | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index 04fba17be4..e86116680a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -111,17 +111,27 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
     val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
       genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
     }
+    val localIdx = ctx.freshName("localIdx")
+    val localEnd = ctx.freshName("localEnd")
+    val numRows = ctx.freshName("numRows")
+    val shouldStop = if (isShouldStopRequired) {
+      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
+    } else {
+      "// shouldStop check is eliminated"
+    }
     s"""
        |if ($batch == null) {
        |  $nextBatch();
        |}
        |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
+       |  int $numRows = $batch.numRows();
+       |  int $localEnd = $numRows - $idx;
+       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
+       |    int $rowidx = $idx + $localIdx;
        |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
+       |    $shouldStop
        |  }
+       |  $idx = $numRows;
        |  $batch = null;
        |  $nextBatch();
        |}
-- 
GitLab