Skip to content
Snippets Groups Projects
Commit 6cb6096c authored by Forest Fang's avatar Forest Fang Committed by Reynold Xin
Browse files

[SPARK-8443][SQL] Split GenerateMutableProjection Codegen due to JVM Code Size Limits

By grouping projection calls into multiple apply function, we are able to push the number of projections codegen can handle from ~1k to ~60k. I have set the unit test to test against 5k as 60k took 15s for the unit test to complete.

Author: Forest Fang <forest.fang@outlook.com>

Closes #7076 from saurfang/codegen_size_limit and squashes the following commits:

b7a7635 [Forest Fang] [SPARK-8443][SQL] Execute and verify split projections in test
adef95a [Forest Fang] [SPARK-8443][SQL] Use safer factor and rewrite splitting code
1b5aa7e [Forest Fang] [SPARK-8443][SQL] inline execution if one block only
9405680 [Forest Fang] [SPARK-8443][SQL] split projection code by size limit
parent 45d798c3
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen ...@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions._
import scala.collection.mutable.ArrayBuffer
// MutableProjection is not accessible in Java // MutableProjection is not accessible in Java
abstract class BaseMutableProjection extends MutableProjection abstract class BaseMutableProjection extends MutableProjection
...@@ -45,10 +47,41 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu ...@@ -45,10 +47,41 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
else else
${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)}; ${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)};
""" """
}.mkString("\n") }
// collect projections into blocks as function has 64kb codesize limit in JVM
val projectionBlocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
for (projection <- projectionCode) {
if (blockBuilder.length > 16 * 1000) {
projectionBlocks.append(blockBuilder.toString())
blockBuilder.clear()
}
blockBuilder.append(projection)
}
projectionBlocks.append(blockBuilder.toString())
val (projectionFuns, projectionCalls) = {
// inline execution if codesize limit was not broken
if (projectionBlocks.length == 1) {
("", projectionBlocks.head)
} else {
(
projectionBlocks.zipWithIndex.map { case (body, i) =>
s"""
|private void apply$i(InternalRow i) {
| $body
|}
""".stripMargin
}.mkString,
projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n")
)
}
}
val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) => val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) =>
s"private $javaType $variableName = $initialValue;" s"private $javaType $variableName = $initialValue;"
}.mkString("\n ") }.mkString("\n ")
val code = s""" val code = s"""
public Object generate($exprType[] expr) { public Object generate($exprType[] expr) {
return new SpecificProjection(expr); return new SpecificProjection(expr);
...@@ -75,9 +108,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu ...@@ -75,9 +108,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
return (InternalRow) mutableRow; return (InternalRow) mutableRow;
} }
$projectionFuns
public Object apply(Object _i) { public Object apply(Object _i) {
InternalRow i = (InternalRow) _i; InternalRow i = (InternalRow) _i;
$projectionCode $projectionCalls
return mutableRow; return mutableRow;
} }
......
...@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ ...@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
/** /**
* Additional tests for code generation. * Additional tests for code generation.
*/ */
class CodeGenerationSuite extends SparkFunSuite { class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
test("multithreaded eval") { test("multithreaded eval") {
import scala.concurrent._ import scala.concurrent._
...@@ -42,4 +42,16 @@ class CodeGenerationSuite extends SparkFunSuite { ...@@ -42,4 +42,16 @@ class CodeGenerationSuite extends SparkFunSuite {
futures.foreach(Await.result(_, 10.seconds)) futures.foreach(Await.result(_, 10.seconds))
} }
test("SPARK-8443: split wide projections into blocks due to JVM code size limit") {
val length = 5000
val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
val plan = GenerateMutableProjection.generate(expressions)()
val actual = plan(new GenericMutableRow(length)).toSeq
val expected = Seq.fill(length)(true)
if (!checkResult(actual, expected)) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment