Skip to content
Snippets Groups Projects
Commit 877dc712 authored by Kazuaki Ishizaki's avatar Kazuaki Ishizaki Committed by Davies Liu
Browse files

[SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can...

[SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames

## What changes were proposed in this pull request?

This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for  lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method

## How was this patch tested?

Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite```

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #12108 from kiszk/SPARK-14138-master.
parent 27e71a2c
No related branches found
No related tags found
No related merge requests found
...@@ -88,7 +88,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ...@@ -88,7 +88,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
case array: ArrayType => classOf[ArrayColumnAccessor].getName case array: ArrayType => classOf[ArrayColumnAccessor].getName
case t: MapType => classOf[MapColumnAccessor].getName case t: MapType => classOf[MapColumnAccessor].getName
} }
ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") ctx.addMutableState(accessorCls, accessorName, "")
val createCode = dt match { val createCode = dt match {
case t if ctx.isPrimitiveType(dt) => case t if ctx.isPrimitiveType(dt) =>
...@@ -114,6 +114,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ...@@ -114,6 +114,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
(createCode, extract + patch) (createCode, extract + patch)
}.unzip }.unzip
/*
* 200 = 6000 bytes / 30 (up to 30 bytes per one call))
* the maximum byte code size to be compiled for HotSpot is 8000.
* We should keep less than 8000
*/
val numberOfStatementsThreshold = 200
val (initializerAccessorCalls, extractorCalls) =
if (initializeAccessors.length <= numberOfStatementsThreshold) {
(initializeAccessors.mkString("\n"), extractors.mkString("\n"))
} else {
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
var groupedAccessorsLength = 0
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
groupedAccessorsLength += 1
val funcName = s"accessors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
val funcName = s"extractors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
(0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
}
val code = s""" val code = s"""
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
...@@ -149,8 +185,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ...@@ -149,8 +185,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.nativeOrder = ByteOrder.nativeOrder(); this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[${columnTypes.length}][]; this.buffers = new byte[${columnTypes.length}][];
this.mutableRow = new MutableUnsafeRow(rowWriter); this.mutableRow = new MutableUnsafeRow(rowWriter);
${ctx.initMutableStates()}
} }
public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
...@@ -159,6 +193,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ...@@ -159,6 +193,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.columnIndexes = columnIndexes; this.columnIndexes = columnIndexes;
} }
${ctx.declareAddedFunctions()}
public boolean hasNext() { public boolean hasNext() {
if (currentRow < numRowsInBatch) { if (currentRow < numRowsInBatch) {
return true; return true;
...@@ -173,7 +209,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ...@@ -173,7 +209,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
for (int i = 0; i < columnIndexes.length; i ++) { for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]]; buffers[i] = batch.buffers()[columnIndexes[i]];
} }
${initializeAccessors.mkString("\n")} ${initializerAccessorCalls}
return hasNext(); return hasNext();
} }
...@@ -182,7 +218,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ...@@ -182,7 +218,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
currentRow += 1; currentRow += 1;
bufferHolder.reset(); bufferHolder.reset();
rowWriter.zeroOutNullBytes(); rowWriter.zeroOutNullBytes();
${extractors.mkString("\n")} ${extractorCalls}
unsafeRow.setTotalSize(bufferHolder.totalSize()); unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow; return unsafeRow;
} }
......
...@@ -220,4 +220,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { ...@@ -220,4 +220,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
assert(data.count() === 10) assert(data.count() === 10)
assert(data.filter($"s" === "3").count() === 1) assert(data.filter($"s" === "3").count() === 1)
} }
test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
val length1 = 3999
val columnTypes1 = List.fill(length1)(IntegerType)
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
val length2 = 10000
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment