Skip to content
Snippets Groups Projects
Commit cb6c48c8 authored by kai's avatar kai Committed by Michael Armbrust
Browse files

[SQL] Optimize arithmetic and predicate operators

Existing implementation of arithmetic operators and BinaryComparison operators have redundant type checking codes, e.g.:
Expression.n2 is used by Add/Subtract/Multiply.
(1) n2 always checks left.dataType == right.dataType. However, this checking should be done once when we resolve expression types;
(2) n2 requires dataType is a NumericType. This can be done once.

This PR optimizes arithmetic and predicate operators by removing such redundant type-checking codes.

Some preliminary benchmarking on 10G TPC-H data over 5 r3.2xlarge EC2 machines shows that this PR can reduce the query time by 5.5% to 11%.
The benchmark queries follow the template below, where OP is plus/minus/times/divide/remainder/bitwise and/bitwise or/bitwise xor.

SELECT l_returnflag,  l_linestatus, SUM(l_quantity OP cnt1), SUM(l_quantity OP cnt2), ...., SUM(l_quantity OP cnt700)
FROM (
    SELECT l_returnflag, l_linestatus, l_quantity, 1 AS cnt1, 2 AS cnt2, ..., 700 AS cnt700
    FROM lineitem
    WHERE l_shipdate <= '1998-09-01'
)
GROUP BY l_returnflag,  l_linestatus;

Author: kai <kaizeng@eecs.berkeley.edu>

Closes #4472 from kai-zeng/arithmetic-optimize and squashes the following commits:

fef0cf1 [kai] Merge branch 'master' of github.com:apache/spark into arithmetic-optimize
4b3a1bb [kai] chmod a-x
5a41e49 [kai] chmod a-x Expression.scala
cb37c94 [kai] rebase onto spark master
7f6e968 [kai] chmod 100755 -> 100644
6cddb46 [kai] format
7490dbc [kai] fix unresolved-expression exception for EqualTo
9c40bc0 [kai] fix bitwisenot
3cbd363 [kai] clean up test code
ca47801 [kai] override evalInternal for bitwise ops
8fa84a1 [kai] add bitwise or and xor
6892fc4 [kai] revert override evalInternal
f8eba24 [kai] override evalInternal
31ccdd4 [kai] rewrite all bitwise op and remove evalInternal
86297e2 [kai] generalized
cb92ae1 [kai] bitwise-and: override eval
97a7d6c [kai] bitwise-and: override evalInternal using and func
0906c39 [kai] add bitwise test
62abbbc [kai] clean up predicate and arithmetic
b34d58d [kai] add caching and benmark option
12c5b32 [kai] override eval
1cd7571 [kai] fix sqrt and maxof
03fd0c3 [kai] fix predicate
16fd84c [kai] optimize + - * / % -(unary) abs < > <= >=
fd95823 [kai] remove unnecessary type checking
24d062f [kai] test suite
parent f3ff1eb2
No related branches found
No related tags found
No related merge requests found
Showing
with 290 additions and 260 deletions
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
...@@ -77,206 +77,6 @@ abstract class Expression extends TreeNode[Expression] { ...@@ -77,206 +77,6 @@ abstract class Expression extends TreeNode[Expression] {
case u: UnresolvedAttribute => PrettyAttribute(u.name) case u: UnresolvedAttribute => PrettyAttribute(u.name)
}.toString }.toString
} }
/**
* A set of helper functions that return the correct descendant of `scala.math.Numeric[T]` type
* and do any casting necessary of child evaluation.
*/
@inline
def n1(e: Expression, i: Row, f: ((Numeric[Any], Any) => Any)): Any = {
val evalE = e.eval(i)
if (evalE == null) {
null
} else {
e.dataType match {
case n: NumericType =>
val castedFunction = f.asInstanceOf[(Numeric[n.JvmType], n.JvmType) => n.JvmType]
castedFunction(n.numeric, evalE.asInstanceOf[n.JvmType])
case other => sys.error(s"Type $other does not support numeric operations")
}
}
}
/**
* Evaluation helper function for 2 Numeric children expressions. Those expressions are supposed
* to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@inline
protected final def n2(
i: Row,
e1: Expression,
e2: Expression,
f: ((Numeric[Any], Any, Any) => Any)): Any = {
if (e1.dataType != e2.dataType) {
throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
}
val evalE1 = e1.eval(i)
if(evalE1 == null) {
null
} else {
val evalE2 = e2.eval(i)
if (evalE2 == null) {
null
} else {
e1.dataType match {
case n: NumericType =>
f.asInstanceOf[(Numeric[n.JvmType], n.JvmType, n.JvmType) => n.JvmType](
n.numeric, evalE1.asInstanceOf[n.JvmType], evalE2.asInstanceOf[n.JvmType])
case other => sys.error(s"Type $other does not support numeric operations")
}
}
}
}
/**
* Evaluation helper function for 2 Fractional children expressions. Those expressions are
* supposed to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@inline
protected final def f2(
i: Row,
e1: Expression,
e2: Expression,
f: ((Fractional[Any], Any, Any) => Any)): Any = {
if (e1.dataType != e2.dataType) {
throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
}
val evalE1 = e1.eval(i: Row)
if(evalE1 == null) {
null
} else {
val evalE2 = e2.eval(i: Row)
if (evalE2 == null) {
null
} else {
e1.dataType match {
case ft: FractionalType =>
f.asInstanceOf[(Fractional[ft.JvmType], ft.JvmType, ft.JvmType) => ft.JvmType](
ft.fractional, evalE1.asInstanceOf[ft.JvmType], evalE2.asInstanceOf[ft.JvmType])
case other => sys.error(s"Type $other does not support fractional operations")
}
}
}
}
/**
* Evaluation helper function for 1 Fractional children expression.
* if the expression result is null, the evaluation result should be null.
*/
@inline
protected final def f1(i: Row, e1: Expression, f: ((Fractional[Any], Any) => Any)): Any = {
val evalE1 = e1.eval(i: Row)
if(evalE1 == null) {
null
} else {
e1.dataType match {
case ft: FractionalType =>
f.asInstanceOf[(Fractional[ft.JvmType], ft.JvmType) => ft.JvmType](
ft.fractional, evalE1.asInstanceOf[ft.JvmType])
case other => sys.error(s"Type $other does not support fractional operations")
}
}
}
/**
* Evaluation helper function for 2 Integral children expressions. Those expressions are
* supposed to be in the same data type, and also the return type.
* Either one of the expressions result is null, the evaluation result should be null.
*/
@inline
protected final def i2(
i: Row,
e1: Expression,
e2: Expression,
f: ((Integral[Any], Any, Any) => Any)): Any = {
if (e1.dataType != e2.dataType) {
throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
}
val evalE1 = e1.eval(i)
if(evalE1 == null) {
null
} else {
val evalE2 = e2.eval(i)
if (evalE2 == null) {
null
} else {
e1.dataType match {
case i: IntegralType =>
f.asInstanceOf[(Integral[i.JvmType], i.JvmType, i.JvmType) => i.JvmType](
i.integral, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
case i: FractionalType =>
f.asInstanceOf[(Integral[i.JvmType], i.JvmType, i.JvmType) => i.JvmType](
i.asIntegral, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
case other => sys.error(s"Type $other does not support numeric operations")
}
}
}
}
/**
* Evaluation helper function for 1 Integral children expression.
* if the expression result is null, the evaluation result should be null.
*/
@inline
protected final def i1(i: Row, e1: Expression, f: ((Integral[Any], Any) => Any)): Any = {
val evalE1 = e1.eval(i)
if(evalE1 == null) {
null
} else {
e1.dataType match {
case i: IntegralType =>
f.asInstanceOf[(Integral[i.JvmType], i.JvmType) => i.JvmType](
i.integral, evalE1.asInstanceOf[i.JvmType])
case i: FractionalType =>
f.asInstanceOf[(Integral[i.JvmType], i.JvmType) => i.JvmType](
i.asIntegral, evalE1.asInstanceOf[i.JvmType])
case other => sys.error(s"Type $other does not support numeric operations")
}
}
}
/**
* Evaluation helper function for 2 Comparable children expressions. Those expressions are
* supposed to be in the same data type, and the return type should be Integer:
* Negative value: 1st argument less than 2nd argument
* Zero: 1st argument equals 2nd argument
* Positive value: 1st argument greater than 2nd argument
*
* Either one of the expressions result is null, the evaluation result should be null.
*/
@inline
protected final def c2(
i: Row,
e1: Expression,
e2: Expression,
f: ((Ordering[Any], Any, Any) => Any)): Any = {
if (e1.dataType != e2.dataType) {
throw new TreeNodeException(this, s"Types do not match ${e1.dataType} != ${e2.dataType}")
}
val evalE1 = e1.eval(i)
if(evalE1 == null) {
null
} else {
val evalE2 = e2.eval(i)
if (evalE2 == null) {
null
} else {
e1.dataType match {
case i: NativeType =>
f.asInstanceOf[(Ordering[i.JvmType], i.JvmType, i.JvmType) => Boolean](
i.ordering, evalE1.asInstanceOf[i.JvmType], evalE2.asInstanceOf[i.JvmType])
case other => sys.error(s"Type $other does not support ordered operations")
}
}
}
}
} }
abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] { abstract class BinaryExpression extends Expression with trees.BinaryNode[Expression] {
......
File mode changed from 100755 to 100644
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
case class UnaryMinus(child: Expression) extends UnaryExpression { case class UnaryMinus(child: Expression) extends UnaryExpression {
...@@ -28,8 +29,18 @@ case class UnaryMinus(child: Expression) extends UnaryExpression { ...@@ -28,8 +29,18 @@ case class UnaryMinus(child: Expression) extends UnaryExpression {
def nullable = child.nullable def nullable = child.nullable
override def toString = s"-$child" override def toString = s"-$child"
lazy val numeric = dataType match {
case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = { override def eval(input: Row): Any = {
n1(child, input, _.negate(_)) val evalE = child.eval(input)
if (evalE == null) {
null
} else {
numeric.negate(evalE)
}
} }
} }
...@@ -41,18 +52,19 @@ case class Sqrt(child: Expression) extends UnaryExpression { ...@@ -41,18 +52,19 @@ case class Sqrt(child: Expression) extends UnaryExpression {
def nullable = true def nullable = true
override def toString = s"SQRT($child)" override def toString = s"SQRT($child)"
lazy val numeric = child.dataType match {
case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
case other => sys.error(s"Type $other does not support non-negative numeric operations")
}
override def eval(input: Row): Any = { override def eval(input: Row): Any = {
val evalE = child.eval(input) val evalE = child.eval(input)
if (evalE == null) { if (evalE == null) {
null null
} else { } else {
child.dataType match { val value = numeric.toDouble(evalE)
case n: NumericType => if (value < 0) null
val value = n.numeric.toDouble(evalE.asInstanceOf[n.JvmType]) else math.sqrt(value)
if (value < 0) null
else math.sqrt(value)
case other => sys.error(s"Type $other does not support non-negative numeric operations")
}
} }
} }
} }
...@@ -98,19 +110,70 @@ abstract class BinaryArithmetic extends BinaryExpression { ...@@ -98,19 +110,70 @@ abstract class BinaryArithmetic extends BinaryExpression {
case class Add(left: Expression, right: Expression) extends BinaryArithmetic { case class Add(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "+" def symbol = "+"
override def eval(input: Row): Any = n2(input, left, right, _.plus(_, _)) lazy val numeric = dataType match {
case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
numeric.plus(evalE1, evalE2)
}
}
}
} }
case class Subtract(left: Expression, right: Expression) extends BinaryArithmetic { case class Subtract(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "-" def symbol = "-"
override def eval(input: Row): Any = n2(input, left, right, _.minus(_, _)) lazy val numeric = dataType match {
case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
numeric.minus(evalE1, evalE2)
}
}
}
} }
case class Multiply(left: Expression, right: Expression) extends BinaryArithmetic { case class Multiply(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "*" def symbol = "*"
override def eval(input: Row): Any = n2(input, left, right, _.times(_, _)) lazy val numeric = dataType match {
case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
numeric.times(evalE1, evalE2)
}
}
}
} }
case class Divide(left: Expression, right: Expression) extends BinaryArithmetic { case class Divide(left: Expression, right: Expression) extends BinaryArithmetic {
...@@ -118,16 +181,25 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic ...@@ -118,16 +181,25 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic
override def nullable = true override def nullable = true
lazy val div: (Any, Any) => Any = dataType match {
case ft: FractionalType => ft.fractional.asInstanceOf[Fractional[Any]].div
case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = { override def eval(input: Row): Any = {
val evalE2 = right.eval(input) val evalE2 = right.eval(input)
dataType match { if (evalE2 == null || evalE2 == 0) {
case _ if evalE2 == null => null null
case _ if evalE2 == 0 => null } else {
case ft: FractionalType => f1(input, left, _.div(_, evalE2.asInstanceOf[ft.JvmType])) val evalE1 = left.eval(input)
case it: IntegralType => i1(input, left, _.quot(_, evalE2.asInstanceOf[it.JvmType])) if (evalE1 == null) {
null
} else {
div(evalE1, evalE2)
}
} }
} }
} }
case class Remainder(left: Expression, right: Expression) extends BinaryArithmetic { case class Remainder(left: Expression, right: Expression) extends BinaryArithmetic {
...@@ -135,12 +207,23 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet ...@@ -135,12 +207,23 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet
override def nullable = true override def nullable = true
lazy val integral = dataType match {
case i: IntegralType => i.integral.asInstanceOf[Integral[Any]]
case i: FractionalType => i.asIntegral.asInstanceOf[Integral[Any]]
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = { override def eval(input: Row): Any = {
val evalE2 = right.eval(input) val evalE2 = right.eval(input)
dataType match { if (evalE2 == null || evalE2 == 0) {
case _ if evalE2 == null => null null
case _ if evalE2 == 0 => null } else {
case nt: NumericType => i1(input, left, _.rem(_, evalE2.asInstanceOf[nt.JvmType])) val evalE1 = left.eval(input)
if (evalE1 == null) {
null
} else {
integral.rem(evalE1, evalE2)
}
} }
} }
} }
...@@ -151,13 +234,19 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet ...@@ -151,13 +234,19 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet
case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithmetic { case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "&" def symbol = "&"
override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any = dataType match { lazy val and: (Any, Any) => Any = dataType match {
case ByteType => (evalE1.asInstanceOf[Byte] & evalE2.asInstanceOf[Byte]).toByte case ByteType =>
case ShortType => (evalE1.asInstanceOf[Short] & evalE2.asInstanceOf[Short]).toShort ((evalE1: Byte, evalE2: Byte) => (evalE1 & evalE2).toByte).asInstanceOf[(Any, Any) => Any]
case IntegerType => evalE1.asInstanceOf[Int] & evalE2.asInstanceOf[Int] case ShortType =>
case LongType => evalE1.asInstanceOf[Long] & evalE2.asInstanceOf[Long] ((evalE1: Short, evalE2: Short) => (evalE1 & evalE2).toShort).asInstanceOf[(Any, Any) => Any]
case IntegerType =>
((evalE1: Int, evalE2: Int) => evalE1 & evalE2).asInstanceOf[(Any, Any) => Any]
case LongType =>
((evalE1: Long, evalE2: Long) => evalE1 & evalE2).asInstanceOf[(Any, Any) => Any]
case other => sys.error(s"Unsupported bitwise & operation on $other") case other => sys.error(s"Unsupported bitwise & operation on $other")
} }
override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any = and(evalE1, evalE2)
} }
/** /**
...@@ -166,13 +255,19 @@ case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithme ...@@ -166,13 +255,19 @@ case class BitwiseAnd(left: Expression, right: Expression) extends BinaryArithme
case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmetic { case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "|" def symbol = "|"
override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any = dataType match { lazy val or: (Any, Any) => Any = dataType match {
case ByteType => (evalE1.asInstanceOf[Byte] | evalE2.asInstanceOf[Byte]).toByte case ByteType =>
case ShortType => (evalE1.asInstanceOf[Short] | evalE2.asInstanceOf[Short]).toShort ((evalE1: Byte, evalE2: Byte) => (evalE1 | evalE2).toByte).asInstanceOf[(Any, Any) => Any]
case IntegerType => evalE1.asInstanceOf[Int] | evalE2.asInstanceOf[Int] case ShortType =>
case LongType => evalE1.asInstanceOf[Long] | evalE2.asInstanceOf[Long] ((evalE1: Short, evalE2: Short) => (evalE1 | evalE2).toShort).asInstanceOf[(Any, Any) => Any]
case IntegerType =>
((evalE1: Int, evalE2: Int) => evalE1 | evalE2).asInstanceOf[(Any, Any) => Any]
case LongType =>
((evalE1: Long, evalE2: Long) => evalE1 | evalE2).asInstanceOf[(Any, Any) => Any]
case other => sys.error(s"Unsupported bitwise | operation on $other") case other => sys.error(s"Unsupported bitwise | operation on $other")
} }
override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any = or(evalE1, evalE2)
} }
/** /**
...@@ -181,13 +276,19 @@ case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmet ...@@ -181,13 +276,19 @@ case class BitwiseOr(left: Expression, right: Expression) extends BinaryArithmet
case class BitwiseXor(left: Expression, right: Expression) extends BinaryArithmetic { case class BitwiseXor(left: Expression, right: Expression) extends BinaryArithmetic {
def symbol = "^" def symbol = "^"
override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any = dataType match { lazy val xor: (Any, Any) => Any = dataType match {
case ByteType => (evalE1.asInstanceOf[Byte] ^ evalE2.asInstanceOf[Byte]).toByte case ByteType =>
case ShortType => (evalE1.asInstanceOf[Short] ^ evalE2.asInstanceOf[Short]).toShort ((evalE1: Byte, evalE2: Byte) => (evalE1 ^ evalE2).toByte).asInstanceOf[(Any, Any) => Any]
case IntegerType => evalE1.asInstanceOf[Int] ^ evalE2.asInstanceOf[Int] case ShortType =>
case LongType => evalE1.asInstanceOf[Long] ^ evalE2.asInstanceOf[Long] ((evalE1: Short, evalE2: Short) => (evalE1 ^ evalE2).toShort).asInstanceOf[(Any, Any) => Any]
case IntegerType =>
((evalE1: Int, evalE2: Int) => evalE1 ^ evalE2).asInstanceOf[(Any, Any) => Any]
case LongType =>
((evalE1: Long, evalE2: Long) => evalE1 ^ evalE2).asInstanceOf[(Any, Any) => Any]
case other => sys.error(s"Unsupported bitwise ^ operation on $other") case other => sys.error(s"Unsupported bitwise ^ operation on $other")
} }
override def evalInternal(evalE1: EvaluatedType, evalE2: EvaluatedType): Any = xor(evalE1, evalE2)
} }
/** /**
...@@ -201,18 +302,24 @@ case class BitwiseNot(child: Expression) extends UnaryExpression { ...@@ -201,18 +302,24 @@ case class BitwiseNot(child: Expression) extends UnaryExpression {
def nullable = child.nullable def nullable = child.nullable
override def toString = s"~$child" override def toString = s"~$child"
lazy val not: (Any) => Any = dataType match {
case ByteType =>
((evalE: Byte) => (~evalE).toByte).asInstanceOf[(Any) => Any]
case ShortType =>
((evalE: Short) => (~evalE).toShort).asInstanceOf[(Any) => Any]
case IntegerType =>
((evalE: Int) => ~evalE).asInstanceOf[(Any) => Any]
case LongType =>
((evalE: Long) => ~evalE).asInstanceOf[(Any) => Any]
case other => sys.error(s"Unsupported bitwise ~ operation on $other")
}
override def eval(input: Row): Any = { override def eval(input: Row): Any = {
val evalE = child.eval(input) val evalE = child.eval(input)
if (evalE == null) { if (evalE == null) {
null null
} else { } else {
dataType match { not(evalE)
case ByteType => (~evalE.asInstanceOf[Byte]).toByte
case ShortType => (~evalE.asInstanceOf[Short]).toShort
case IntegerType => ~evalE.asInstanceOf[Int]
case LongType => ~evalE.asInstanceOf[Long]
case other => sys.error(s"Unsupported bitwise ~ operation on $other")
}
} }
} }
} }
...@@ -226,21 +333,35 @@ case class MaxOf(left: Expression, right: Expression) extends Expression { ...@@ -226,21 +333,35 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
override def children = left :: right :: Nil override def children = left :: right :: Nil
override def dataType = left.dataType override lazy val resolved =
left.resolved && right.resolved &&
left.dataType == right.dataType
override def dataType = {
if (!resolved) {
throw new UnresolvedException(this,
s"datatype. Can not resolve due to differing types ${left.dataType}, ${right.dataType}")
}
left.dataType
}
lazy val ordering = left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
override def eval(input: Row): Any = { override def eval(input: Row): Any = {
val leftEval = left.eval(input) val evalE1 = left.eval(input)
val rightEval = right.eval(input) val evalE2 = right.eval(input)
if (leftEval == null) { if (evalE1 == null) {
rightEval evalE2
} else if (rightEval == null) { } else if (evalE2 == null) {
leftEval evalE1
} else { } else {
val numeric = left.dataType.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]] if (ordering.compare(evalE1, evalE2) < 0) {
if (numeric.compare(leftEval, rightEval) < 0) { evalE2
rightEval
} else { } else {
leftEval evalE1
} }
} }
} }
...@@ -259,5 +380,17 @@ case class Abs(child: Expression) extends UnaryExpression { ...@@ -259,5 +380,17 @@ case class Abs(child: Expression) extends UnaryExpression {
def nullable = child.nullable def nullable = child.nullable
override def toString = s"Abs($child)" override def toString = s"Abs($child)"
override def eval(input: Row): Any = n1(child, input, _.abs(_)) lazy val numeric = dataType match {
case n: NumericType => n.numeric.asInstanceOf[Numeric[Any]]
case other => sys.error(s"Type $other does not support numeric operations")
}
override def eval(input: Row): Any = {
val evalE = child.eval(input)
if (evalE == null) {
null
} else {
numeric.abs(evalE)
}
}
} }
...@@ -18,8 +18,9 @@ ...@@ -18,8 +18,9 @@
package org.apache.spark.sql.catalyst.expressions package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{BinaryType, BooleanType} import org.apache.spark.sql.types.{BinaryType, BooleanType, NativeType}
object InterpretedPredicate { object InterpretedPredicate {
def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) = def apply(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
...@@ -201,22 +202,118 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp ...@@ -201,22 +202,118 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp
case class LessThan(left: Expression, right: Expression) extends BinaryComparison { case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
def symbol = "<" def symbol = "<"
override def eval(input: Row): Any = c2(input, left, right, _.lt(_, _))
lazy val ordering = {
if (left.dataType != right.dataType) {
throw new TreeNodeException(this,
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
ordering.lt(evalE1, evalE2)
}
}
}
} }
case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
def symbol = "<=" def symbol = "<="
override def eval(input: Row): Any = c2(input, left, right, _.lteq(_, _))
lazy val ordering = {
if (left.dataType != right.dataType) {
throw new TreeNodeException(this,
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
ordering.lteq(evalE1, evalE2)
}
}
}
} }
case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
def symbol = ">" def symbol = ">"
override def eval(input: Row): Any = c2(input, left, right, _.gt(_, _))
lazy val ordering = {
if (left.dataType != right.dataType) {
throw new TreeNodeException(this,
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
ordering.gt(evalE1, evalE2)
}
}
}
} }
case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
def symbol = ">=" def symbol = ">="
override def eval(input: Row): Any = c2(input, left, right, _.gteq(_, _))
lazy val ordering = {
if (left.dataType != right.dataType) {
throw new TreeNodeException(this,
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
override def eval(input: Row): Any = {
val evalE1 = left.eval(input)
if(evalE1 == null) {
null
} else {
val evalE2 = right.eval(input)
if (evalE2 == null) {
null
} else {
ordering.gteq(evalE1, evalE2)
}
}
}
} }
case class If(predicate: Expression, trueValue: Expression, falseValue: Expression) case class If(predicate: Expression, trueValue: Expression, falseValue: Expression)
......
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
File mode changed from 100755 to 100644
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment