Skip to content
Snippets Groups Projects
Commit a20fea98 authored by Cheng Hao's avatar Cheng Hao Committed by Reynold Xin
Browse files

[Spark-1461] Deferred Expression Evaluation (short-circuit evaluation)

This patch unify the foldable & nullable interface for Expression.
1) Deterministic-less UDF (like Rand()) can not be folded.
2) Short-circut will significantly improves the performance in Expression Evaluation, however, the stateful UDF should not be ignored in a short-circuit evaluation(e.g. in expression: col1 > 0 and row_sequence() < 1000, row_sequence() can not be ignored even if col1 > 0 is false)

I brought an concept of DeferredObject from Hive, which has 2 kinds of children classes (EagerResult / DeferredResult), the former requires triggering the evaluation before it's created, while the later trigger the evaluation when first called its get() method.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #446 from chenghao-intel/expression_deferred_evaluation and squashes the following commits:

d2729de [Cheng Hao] Fix the codestyle issues
a08f09c [Cheng Hao] fix bug in or/and short-circuit evaluation
af2236b [Cheng Hao] revert the short-circuit expression evaluation for IF
b7861d2 [Cheng Hao] Add Support for Deferred Expression Evaluation
parent bb98ecaf
No related branches found
No related tags found
No related merge requests found
......@@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate {
override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == false || r == false) {
false
} else if (l == null || r == null ) {
null
if (l == false) {
false
} else {
true
val r = right.eval(input)
if (r == false) {
false
} else {
if (l != null && r != null) {
true
} else {
null
}
}
}
}
}
......@@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate {
override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == true || r == true) {
if (l == true) {
true
} else if (l == null || r == null) {
null
} else {
false
val r = right.eval(input)
if (r == true) {
true
} else {
if (l != null && r != null) {
false
} else {
null
}
}
}
}
}
......@@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison
def symbol = "="
override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == null || r == null) null else l == r
if (l == null) {
null
} else {
val r = right.eval(input)
if (r == null) null else l == r
}
}
}
......@@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
extends Expression {
def children = predicate :: trueValue :: falseValue :: Nil
def nullable = trueValue.nullable || falseValue.nullable
override def nullable = trueValue.nullable || falseValue.nullable
def references = children.flatMap(_.references).toSet
override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
def dataType = {
......@@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
}
type EvaluatedType = Any
override def eval(input: Row): Any = {
if (predicate.eval(input).asInstanceOf[Boolean]) {
if (true == predicate.eval(input)) {
trueValue.eval(input)
} else {
falseValue.eval(input)
......
......@@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression])
isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
}
protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({
new DeferredObjectAdapter
})
// Adapter from Catalyst ExpressionResult to Hive DeferredObject
class DeferredObjectAdapter extends DeferredObject {
private var func: () => Any = _
def set(func: () => Any) {
this.func = func
}
override def prepare(i: Int) = {}
override def get(): AnyRef = wrap(func())
}
val dataType: DataType = inspectorToDataType(returnInspector)
override def eval(input: Row): Any = {
returnInspector // Make sure initialized.
val args = children.map { v =>
new DeferredObject {
override def prepare(i: Int) = {}
override def get(): AnyRef = wrap(v.eval(input))
}
}.toArray
unwrap(function.evaluate(args))
var i = 0
while (i < children.length) {
val idx = i
deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)})
i += 1
}
unwrap(function.evaluate(deferedObjects))
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment