Skip to content
Snippets Groups Projects
Commit d24801ad authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Davies Liu
Browse files

[SPARK-13636] [SQL] Directly consume UnsafeRow in wholestage codegen plans

JIRA: https://issues.apache.org/jira/browse/SPARK-13636

## What changes were proposed in this pull request?

As shown in the wholestage codegen verion of Sort operator, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables. We should avoid the unnecessary unpack and pack variables from UnsafeRows.

## How was this patch tested?

All existing wholestage codegen tests should be passed.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #11484 from viirya/direct-consume-unsaferow.
parent 74267beb
No related branches found
No related tags found
No related merge requests found
...@@ -93,7 +93,7 @@ case class Expand( ...@@ -93,7 +93,7 @@ case class Expand(
child.asInstanceOf[CodegenSupport].produce(ctx, this) child.asInstanceOf[CodegenSupport].produce(ctx, this)
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
/* /*
* When the projections list looks like: * When the projections list looks like:
* expr1A, exprB, expr1C * expr1A, exprB, expr1C
......
...@@ -105,6 +105,8 @@ case class Sort( ...@@ -105,6 +105,8 @@ case class Sort(
// Name of sorter variable used in codegen. // Name of sorter variable used in codegen.
private var sorterVariable: String = _ private var sorterVariable: String = _
override def preferUnsafeRow: Boolean = true
override protected def doProduce(ctx: CodegenContext): String = { override protected def doProduce(ctx: CodegenContext): String = {
val needToSort = ctx.freshName("needToSort") val needToSort = ctx.freshName("needToSort")
ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") ctx.addMutableState("boolean", needToSort, s"$needToSort = true;")
...@@ -153,18 +155,22 @@ case class Sort( ...@@ -153,18 +155,22 @@ case class Sort(
""".stripMargin.trim """.stripMargin.trim
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
val colExprs = child.output.zipWithIndex.map { case (attr, i) => if (row != null) {
BoundReference(i, attr.dataType, attr.nullable) s"$sorterVariable.insertRow((UnsafeRow)$row);"
} } else {
val colExprs = child.output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable)
}
ctx.currentVars = input ctx.currentVars = input
val code = GenerateUnsafeProjection.createCode(ctx, colExprs) val code = GenerateUnsafeProjection.createCode(ctx, colExprs)
s""" s"""
| // Convert the input attributes to an UnsafeRow and add it to the sorter | // Convert the input attributes to an UnsafeRow and add it to the sorter
| ${code.code} | ${code.code}
| $sorterVariable.insertRow(${code.value}); | $sorterVariable.insertRow(${code.value});
""".stripMargin.trim """.stripMargin.trim
}
} }
} }
...@@ -65,7 +65,12 @@ trait CodegenSupport extends SparkPlan { ...@@ -65,7 +65,12 @@ trait CodegenSupport extends SparkPlan {
/** /**
* Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
*/ */
private var parent: CodegenSupport = null protected var parent: CodegenSupport = null
/**
* Whether this SparkPlan prefers to accept UnsafeRow as input in doConsume.
*/
def preferUnsafeRow: Boolean = false
/** /**
* Returns all the RDDs of InternalRow which generates the input rows. * Returns all the RDDs of InternalRow which generates the input rows.
...@@ -176,11 +181,20 @@ trait CodegenSupport extends SparkPlan { ...@@ -176,11 +181,20 @@ trait CodegenSupport extends SparkPlan {
} else { } else {
input input
} }
val evaluated =
if (row != null && preferUnsafeRow) {
// Current plan can consume UnsafeRows directly.
""
} else {
evaluateRequiredVariables(child.output, inputVars, usedInputs)
}
s""" s"""
| |
|/*** CONSUME: ${toCommentSafeString(this.simpleString)} */ |/*** CONSUME: ${toCommentSafeString(this.simpleString)} */
|${evaluateRequiredVariables(child.output, inputVars, usedInputs)} |${evaluated}
|${doConsume(ctx, inputVars)} |${doConsume(ctx, inputVars, row)}
""".stripMargin """.stripMargin
} }
...@@ -195,7 +209,7 @@ trait CodegenSupport extends SparkPlan { ...@@ -195,7 +209,7 @@ trait CodegenSupport extends SparkPlan {
* if (isNull1 || !value2) continue; * if (isNull1 || !value2) continue;
* # call consume(), which will call parent.doConsume() * # call consume(), which will call parent.doConsume()
*/ */
protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
throw new UnsupportedOperationException throw new UnsupportedOperationException
} }
} }
...@@ -238,7 +252,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport ...@@ -238,7 +252,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
s""" s"""
| while (!shouldStop() && $input.hasNext()) { | while (!shouldStop() && $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next(); | InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, columns).trim} | ${consume(ctx, columns, row).trim}
| } | }
""".stripMargin """.stripMargin
} }
......
...@@ -139,7 +139,7 @@ case class TungstenAggregate( ...@@ -139,7 +139,7 @@ case class TungstenAggregate(
} }
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
if (groupingExpressions.isEmpty) { if (groupingExpressions.isEmpty) {
doConsumeWithoutKeys(ctx, input) doConsumeWithoutKeys(ctx, input)
} else { } else {
......
...@@ -49,7 +49,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) ...@@ -49,7 +49,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
references.filter(a => usedMoreThanOnce.contains(a.exprId)) references.filter(a => usedMoreThanOnce.contains(a.exprId))
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
val exprs = projectList.map(x => val exprs = projectList.map(x =>
ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output)))
ctx.currentVars = input ctx.currentVars = input
...@@ -88,7 +88,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit ...@@ -88,7 +88,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
child.asInstanceOf[CodegenSupport].produce(ctx, this) child.asInstanceOf[CodegenSupport].produce(ctx, this)
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
val numOutput = metricTerm(ctx, "numOutputRows") val numOutput = metricTerm(ctx, "numOutputRows")
val expr = ExpressionCanonicalizer.execute( val expr = ExpressionCanonicalizer.execute(
BindReferences.bindReference(condition, child.output)) BindReferences.bindReference(condition, child.output))
......
...@@ -136,7 +136,7 @@ package object debug { ...@@ -136,7 +136,7 @@ package object debug {
child.asInstanceOf[CodegenSupport].produce(ctx, this) child.asInstanceOf[CodegenSupport].produce(ctx, this)
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
consume(ctx, input) consume(ctx, input)
} }
} }
......
...@@ -107,7 +107,7 @@ case class BroadcastHashJoin( ...@@ -107,7 +107,7 @@ case class BroadcastHashJoin(
streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this) streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
if (joinType == Inner) { if (joinType == Inner) {
codegenInner(ctx, input) codegenInner(ctx, input)
} else { } else {
......
...@@ -65,7 +65,7 @@ trait BaseLimit extends UnaryNode with CodegenSupport { ...@@ -65,7 +65,7 @@ trait BaseLimit extends UnaryNode with CodegenSupport {
child.asInstanceOf[CodegenSupport].produce(ctx, this) child.asInstanceOf[CodegenSupport].produce(ctx, this)
} }
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
val stopEarly = ctx.freshName("stopEarly") val stopEarly = ctx.freshName("stopEarly")
ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;") ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment