From 2c59d5c12a0a02702839bfaf631505b8a311c5a9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <viirya@gmail.com> Date: Fri, 19 Jun 2015 10:09:31 -0700 Subject: [PATCH] [SPARK-8207] [SQL] Add math function bin JIRA: https://issues.apache.org/jira/browse/SPARK-8207 Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #6721 from viirya/expr_bin and squashes the following commits: 07e1c8f [Liang-Chi Hsieh] Remove AbstractUnaryMathExpression and let BIN inherit UnaryExpression. 0677f1a [Liang-Chi Hsieh] For comments. cf62b95 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin 0cf20f2 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin dea9c12 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin d4f4774 [Liang-Chi Hsieh] Add @ignore_unicode_prefix. 7a0196f [Liang-Chi Hsieh] Fix python style. ac2bacd [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin a0a2d0f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin 4cb764d [Liang-Chi Hsieh] For comments. 0f78682 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin c0c3197 [Liang-Chi Hsieh] Add bin to FunctionRegistry. 824f761 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into expr_bin 50e0c3b [Liang-Chi Hsieh] Add math function bin(a: long): string. --- python/pyspark/sql/functions.py | 14 ++++++++ .../catalyst/analysis/FunctionRegistry.scala | 1 + .../spark/sql/catalyst/expressions/math.scala | 33 +++++++++++++++++- .../expressions/MathFunctionsSuite.scala | 34 +++++++++++++++---- .../org/apache/spark/sql/functions.scala | 18 ++++++++++ .../spark/sql/DataFrameFunctionsSuite.scala | 10 ++++++ 6 files changed, 102 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index acdb01d3d3..cfa87aeea1 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -35,6 +35,7 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq __all__ = [ 'array', 'approxCountDistinct', + 'bin', 'coalesce', 'countDistinct', 'explode', @@ -231,6 +232,19 @@ def approxCountDistinct(col, rsd=None): return Column(jc) +@ignore_unicode_prefix +@since(1.5) +def bin(col): + """Returns the string representation of the binary value of the given column. + + >>> df.select(bin(df.age).alias('c')).collect() + [Row(c=u'10'), Row(c=u'101')] + """ + sc = SparkContext._active_spark_context + jc = sc._jvm.functions.bin(_to_java_column(col)) + return Column(jc) + + @since(1.4) def coalesce(*cols): """Returns the first column that is not null. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 13b2bb05f5..79273a7840 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -103,6 +103,7 @@ object FunctionRegistry { expression[Asin]("asin"), expression[Atan]("atan"), expression[Atan2]("atan2"), + expression[Bin]("bin"), expression[Cbrt]("cbrt"), expression[Ceil]("ceil"), expression[Ceil]("ceiling"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala index f79bf4aee0..250564dc4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql.catalyst.expressions +import java.lang.{Long => JLong} + import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.types.{DataType, DoubleType} +import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType} +import org.apache.spark.unsafe.types.UTF8String /** * A leaf expression specifically for math constants. Math constants expect no input. @@ -207,6 +210,34 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia override def funcName: String = "toRadians" } +case class Bin(child: Expression) + extends UnaryExpression with Serializable with ExpectsInputTypes { + + val name: String = "BIN" + + override def foldable: Boolean = child.foldable + override def nullable: Boolean = true + override def toString: String = s"$name($child)" + + override def expectedChildTypes: Seq[DataType] = Seq(LongType) + override def dataType: DataType = StringType + + def funcName: String = name.toLowerCase + + override def eval(input: catalyst.InternalRow): Any = { + val evalE = child.eval(input) + if (evalE == null) { + null + } else { + UTF8String.fromString(JLong.toBinaryString(evalE.asInstanceOf[Long])) + } + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + defineCodeGen(ctx, ev, (c) => + s"${ctx.stringType}.fromString(java.lang.Long.toBinaryString($c))") + } +} //////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index 21e9b92b72..0d1d5ebdff 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.types.{DataType, DoubleType, LongType} class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -41,16 +42,18 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { * Used for testing unary math expressions. * * @param c expression - * @param f The functions in scala.math + * @param f The functions in scala.math or elsewhere used to generate expected results * @param domain The set of values to run the function with * @param expectNull Whether the given values should return null or not * @tparam T Generic type for primitives + * @tparam U Generic type for the output of the given function `f` */ - private def testUnary[T]( + private def testUnary[T, U]( c: Expression => Expression, - f: T => T, + f: T => U, domain: Iterable[T] = (-20 to 20).map(_ * 0.1), - expectNull: Boolean = false): Unit = { + expectNull: Boolean = false, + evalType: DataType = DoubleType): Unit = { if (expectNull) { domain.foreach { value => checkEvaluation(c(Literal(value)), null, EmptyRow) @@ -60,7 +63,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(c(Literal(value)), f(value), EmptyRow) } } - checkEvaluation(c(Literal.create(null, DoubleType)), null, create_row(null)) + checkEvaluation(c(Literal.create(null, evalType)), null, create_row(null)) } /** @@ -168,7 +171,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("signum") { - testUnary[Double](Signum, math.signum) + testUnary[Double, Double](Signum, math.signum) } test("log") { @@ -186,6 +189,23 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { testUnary(Log1p, math.log1p, (-10 to -2).map(_ * 1.0), expectNull = true) } + test("bin") { + testUnary(Bin, java.lang.Long.toBinaryString, (-20 to 20).map(_.toLong), evalType = LongType) + + val row = create_row(null, 12L, 123L, 1234L, -123L) + val l1 = 'a.long.at(0) + val l2 = 'a.long.at(1) + val l3 = 'a.long.at(2) + val l4 = 'a.long.at(3) + val l5 = 'a.long.at(4) + + checkEvaluation(Bin(l1), null, row) + checkEvaluation(Bin(l2), java.lang.Long.toBinaryString(12), row) + checkEvaluation(Bin(l3), java.lang.Long.toBinaryString(123), row) + checkEvaluation(Bin(l4), java.lang.Long.toBinaryString(1234), row) + checkEvaluation(Bin(l5), java.lang.Long.toBinaryString(-123), row) + } + test("log2") { def f: (Double) => Double = (x: Double) => math.log(x) / math.log(2) testUnary(Log2, f, (0 to 20).map(_ * 0.1)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index d8a91bead7..40ae9f5df8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -889,6 +889,24 @@ object functions { */ def atan2(l: Double, rightName: String): Column = atan2(l, Column(rightName)) + /** + * An expression that returns the string representation of the binary value of the given long + * column. For example, bin("12") returns "1100". + * + * @group math_funcs + * @since 1.5.0 + */ + def bin(e: Column): Column = Bin(e.expr) + + /** + * An expression that returns the string representation of the binary value of the given long + * column. For example, bin("12") returns "1100". + * + * @group math_funcs + * @since 1.5.0 + */ + def bin(columnName: String): Column = bin(Column(columnName)) + /** * Computes the cube-root of the given value. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index cfd23867a9..70819fe287 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -110,6 +110,16 @@ class DataFrameFunctionsSuite extends QueryTest { testData2.collect().toSeq.map(r => Row(~r.getInt(0)))) } + test("bin") { + val df = Seq[(Integer, Integer)]((12, null)).toDF("a", "b") + checkAnswer( + df.select(bin("a"), bin("b")), + Row("1100", null)) + checkAnswer( + df.selectExpr("bin(a)", "bin(b)"), + Row("1100", null)) + } + test("if function") { val df = Seq((1, 2)).toDF("a", "b") checkAnswer( -- GitLab