From f12f11e578169b47e3f8b18b299948c0670ba585 Mon Sep 17 00:00:00 2001
From: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Date: Thu, 31 Mar 2016 15:05:48 -0700
Subject: [PATCH] [SPARK-14138] [SQL] Fix generated SpecificColumnarIterator
 code can exceed JVM size limit for cached DataFrames

## What changes were proposed in this pull request?

This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using two approaches:
1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is actually used, for instantiating accessors
2. Group a lot of method calls (more than 4000) into a method

## How was this patch tested?

Added a new unit test to ```InMemoryColumnarQuerySuite```

Here is generate code

```java
/* 033 */   private org.apache.spark.sql.execution.columnar.CachedBatch batch = null;
/* 034 */
/* 035 */   private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor;
/* 036 */   private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor1;
/* 037 */
/* 038 */   public SpecificColumnarIterator() {
/* 039 */     this.nativeOrder = ByteOrder.nativeOrder();
/* 030 */     this.mutableRow = new MutableUnsafeRow(rowWriter);
/* 041 */   }
/* 042 */
/* 043 */   public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes,
/* 044 */     boolean columnNullables[]) {
/* 044 */     this.input = input;
/* 046 */     this.columnTypes = columnTypes;
/* 047 */     this.columnIndexes = columnIndexes;
/* 048 */   }
/* 049 */
/* 050 */
/* 051 */   private org.apache.spark.sql.execution.columnar.IntColumnAccessor getIntColumnAccessor(int idx) {
/* 052 */     byte[] buffer = batch.buffers()[columnIndexes[idx]];
/* 053 */     return new org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder));
/* 054 */   }
/* 055 */
/* 056 */
/* 057 */
/* 058 */
/* 059 */
/* 060 */
/* 061 */   public boolean hasNext() {
/* 062 */     if (currentRow < numRowsInBatch) {
/* 063 */       return true;
/* 064 */     }
/* 065 */     if (!input.hasNext()) {
/* 066 */       return false;
/* 067 */     }
/* 068 */
/* 069 */     batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next();
/* 070 */     currentRow = 0;
/* 071 */     numRowsInBatch = batch.numRows();
/* 072 */     accessor = getIntColumnAccessor(0);
/* 073 */     accessor1 = getIntColumnAccessor(1);
/* 074 */
/* 075 */     return hasNext();
/* 076 */   }
/* 077 */
/* 078 */   public InternalRow next() {
/* 079 */     currentRow += 1;
/* 080 */     bufferHolder.reset();
/* 081 */     rowWriter.zeroOutNullBytes();
/* 082 */     accessor.extractTo(mutableRow, 0);
/* 083 */     accessor1.extractTo(mutableRow, 1);
/* 084 */     unsafeRow.setTotalSize(bufferHolder.totalSize());
/* 085 */     return unsafeRow;
/* 086 */   }
```

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #11984 from kiszk/SPARK-14138.
---
 .../columnar/GenerateColumnAccessor.scala     | 50 ++++++++++++++++---
 .../columnar/InMemoryColumnarQuerySuite.scala | 10 ++++
 2 files changed, 54 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index eaafc96e4d..4d01b78c3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.columnar
 
+import scala.collection.mutable
+
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -88,7 +90,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
         case array: ArrayType => classOf[ArrayColumnAccessor].getName
         case t: MapType => classOf[MapColumnAccessor].getName
       }
-      ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
+      ctx.addMutableState(accessorCls, accessorName, "")
 
       val createCode = dt match {
         case t if ctx.isPrimitiveType(dt) =>
@@ -97,7 +99,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
         case other =>
           s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder),
-             (${dt.getClass.getName}) columnTypes[$index]);"""
+        (${dt.getClass.getName}) columnTypes[$index]);"""
       }
 
       val extract = s"$accessorName.extractTo(mutableRow, $index);"
@@ -114,6 +116,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
       (createCode, extract + patch)
     }.unzip
 
+    /*
+     * 200 = 6000 bytes / 30 (up to 30 bytes per one call))
+     * the maximum byte code size to be compiled for HotSpot is 8000.
+     * We should keep less than 8000
+     */
+    val numberOfStatementsThreshold = 200
+    val (initializerAccessorCalls, extractorCalls) =
+      if (initializeAccessors.length <= numberOfStatementsThreshold) {
+        (initializeAccessors.mkString("\n"), extractors.mkString("\n"))
+      } else {
+        val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
+        val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
+        var groupedAccessorsLength = 0
+        groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
+          groupedAccessorsLength += 1
+          val funcName = s"accessors$i"
+          val funcCode = s"""
+             |private void $funcName() {
+             |  ${body.mkString("\n")}
+             |}
+           """.stripMargin
+          ctx.addNewFunction(funcName, funcCode)
+        }
+        groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
+          val funcName = s"extractors$i"
+          val funcCode = s"""
+             |private void $funcName() {
+             |  ${body.mkString("\n")}
+             |}
+           """.stripMargin
+          ctx.addNewFunction(funcName, funcCode)
+        }
+        ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
+         (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
+      }
+
     val code = s"""
       import java.nio.ByteBuffer;
       import java.nio.ByteOrder;
@@ -149,8 +187,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           this.nativeOrder = ByteOrder.nativeOrder();
           this.buffers = new byte[${columnTypes.length}][];
           this.mutableRow = new MutableUnsafeRow(rowWriter);
-
-          ${initMutableStates(ctx)}
         }
 
         public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
@@ -159,6 +195,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           this.columnIndexes = columnIndexes;
         }
 
+        ${declareAddedFunctions(ctx)}
+
         public boolean hasNext() {
           if (currentRow < numRowsInBatch) {
             return true;
@@ -173,7 +211,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           for (int i = 0; i < columnIndexes.length; i ++) {
             buffers[i] = batch.buffers()[columnIndexes[i]];
           }
-          ${initializeAccessors.mkString("\n")}
+          ${initializerAccessorCalls}
 
           return hasNext();
         }
@@ -182,7 +220,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           currentRow += 1;
           bufferHolder.reset();
           rowWriter.initialize(bufferHolder, $numFields);
-          ${extractors.mkString("\n")}
+          ${extractorCalls}
           unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize());
           return unsafeRow;
         }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 25afed25c8..557415b801 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -219,4 +219,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     assert(data.count() === 10)
     assert(data.filter($"s" === "3").count() === 1)
   }
+
+  test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
+    val length1 = 3999
+    val columnTypes1 = List.fill(length1)(IntegerType)
+    val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
+
+    val length2 = 10000
+    val columnTypes2 = List.fill(length2)(IntegerType)
+    val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
+  }
 }
-- 
GitLab