From 2f2686a73f5a2a53ca5b1023e0d7e0e6c9be5896 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@databricks.com> Date: Wed, 19 Aug 2015 17:35:41 -0700 Subject: [PATCH] [SPARK-9242] [SQL] Audit UDAF interface. A few minor changes: 1. Improved documentation 2. Rename apply(distinct....) to distinct. 3. Changed MutableAggregationBuffer from a trait to an abstract class. 4. Renamed returnDataType to dataType to be more consistent with other expressions. And unrelated to UDAFs: 1. Renamed file names in expressions to use suffix "Expressions" to be more consistent. 2. Moved regexp related expressions out to its own file. 3. Renamed StringComparison => StringPredicate. Author: Reynold Xin <rxin@databricks.com> Closes #8321 from rxin/SPARK-9242. --- ...bitwise.scala => bitwiseExpressions.scala} | 0 ...als.scala => conditionalExpressions.scala} | 0 ...ctions.scala => datetimeExpressions.scala} | 0 ...nctions.scala => decimalExpressions.scala} | 0 ...nFunctions.scala => jsonExpressions.scala} | 0 .../{math.scala => mathExpressions.scala} | 0 ...lFunctions.scala => nullExpressions.scala} | 0 .../{random.scala => randomExpressions.scala} | 0 .../expressions/regexpExpressions.scala | 346 ++++++++++++++++++ ...erations.scala => stringExpressions.scala} | 332 +---------------- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../expressions/StringExpressionsSuite.scala | 2 +- .../apache/spark/sql/UDFRegistration.scala | 1 + .../spark/sql/execution/aggregate/udaf.scala | 2 +- .../apache/spark/sql/expressions/udaf.scala | 44 ++- .../spark/sql/hive/JavaDataFrameSuite.java | 2 +- .../spark/sql/hive/aggregate/MyDoubleAvg.java | 2 +- .../spark/sql/hive/aggregate/MyDoubleSum.java | 2 +- 18 files changed, 386 insertions(+), 349 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{bitwise.scala => bitwiseExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{conditionals.scala => conditionalExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{datetimeFunctions.scala => datetimeExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{decimalFunctions.scala => decimalExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{jsonFunctions.scala => jsonExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{math.scala => mathExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{nullFunctions.scala => nullExpressions.scala} (100%) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{random.scala => randomExpressions.scala} (100%) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/{stringOperations.scala => stringExpressions.scala} (74%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitwise.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitwiseExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitwise.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitwiseExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala similarity index 100% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala new file mode 100644 index 0000000000..6dff28a7cd --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import java.util.regex.{MatchResult, Pattern} + +import org.apache.commons.lang3.StringEscapeUtils + +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + + +trait StringRegexExpression extends ImplicitCastInputTypes { + self: BinaryExpression => + + def escape(v: String): String + def matches(regex: Pattern, str: String): Boolean + + override def dataType: DataType = BooleanType + override def inputTypes: Seq[DataType] = Seq(StringType, StringType) + + // try cache the pattern for Literal + private lazy val cache: Pattern = right match { + case x @ Literal(value: String, StringType) => compile(value) + case _ => null + } + + protected def compile(str: String): Pattern = if (str == null) { + null + } else { + // Let it raise exception if couldn't compile the regex string + Pattern.compile(escape(str)) + } + + protected def pattern(str: String) = if (cache == null) compile(str) else cache + + protected override def nullSafeEval(input1: Any, input2: Any): Any = { + val regex = pattern(input2.asInstanceOf[UTF8String].toString) + if(regex == null) { + null + } else { + matches(regex, input1.asInstanceOf[UTF8String].toString) + } + } +} + + +/** + * Simple RegEx pattern matching function + */ +case class Like(left: Expression, right: Expression) + extends BinaryExpression with StringRegexExpression with CodegenFallback { + + override def escape(v: String): String = StringUtils.escapeLikeRegex(v) + + override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).matches() + + override def toString: String = s"$left LIKE $right" + + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val patternClass = classOf[Pattern].getName + val escapeFunc = StringUtils.getClass.getName.stripSuffix("$") + ".escapeLikeRegex" + val pattern = ctx.freshName("pattern") + + if (right.foldable) { + val rVal = right.eval() + if (rVal != null) { + val regexStr = + StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString())) + ctx.addMutableState(patternClass, pattern, + s"""$pattern = ${patternClass}.compile("$regexStr");""") + + // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. + val eval = left.gen(ctx) + s""" + ${eval.code} + boolean ${ev.isNull} = ${eval.isNull}; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + ${ev.primitive} = $pattern.matcher(${eval.primitive}.toString()).matches(); + } + """ + } else { + s""" + boolean ${ev.isNull} = true; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + """ + } + } else { + nullSafeCodeGen(ctx, ev, (eval1, eval2) => { + s""" + String rightStr = ${eval2}.toString(); + ${patternClass} $pattern = ${patternClass}.compile($escapeFunc(rightStr)); + ${ev.primitive} = $pattern.matcher(${eval1}.toString()).matches(); + """ + }) + } + } +} + + +case class RLike(left: Expression, right: Expression) + extends BinaryExpression with StringRegexExpression with CodegenFallback { + + override def escape(v: String): String = v + override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0) + override def toString: String = s"$left RLIKE $right" + + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val patternClass = classOf[Pattern].getName + val pattern = ctx.freshName("pattern") + + if (right.foldable) { + val rVal = right.eval() + if (rVal != null) { + val regexStr = + StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString()) + ctx.addMutableState(patternClass, pattern, + s"""$pattern = ${patternClass}.compile("$regexStr");""") + + // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. + val eval = left.gen(ctx) + s""" + ${eval.code} + boolean ${ev.isNull} = ${eval.isNull}; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + if (!${ev.isNull}) { + ${ev.primitive} = $pattern.matcher(${eval.primitive}.toString()).find(0); + } + """ + } else { + s""" + boolean ${ev.isNull} = true; + ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; + """ + } + } else { + nullSafeCodeGen(ctx, ev, (eval1, eval2) => { + s""" + String rightStr = ${eval2}.toString(); + ${patternClass} $pattern = ${patternClass}.compile(rightStr); + ${ev.primitive} = $pattern.matcher(${eval1}.toString()).find(0); + """ + }) + } + } +} + + +/** + * Splits str around pat (pattern is a regular expression). + */ +case class StringSplit(str: Expression, pattern: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def left: Expression = str + override def right: Expression = pattern + override def dataType: DataType = ArrayType(StringType) + override def inputTypes: Seq[DataType] = Seq(StringType, StringType) + + override def nullSafeEval(string: Any, regex: Any): Any = { + val strings = string.asInstanceOf[UTF8String].split(regex.asInstanceOf[UTF8String], -1) + new GenericArrayData(strings.asInstanceOf[Array[Any]]) + } + + override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val arrayClass = classOf[GenericArrayData].getName + nullSafeCodeGen(ctx, ev, (str, pattern) => + // Array in java is covariant, so we don't need to cast UTF8String[] to Object[]. + s"""${ev.primitive} = new $arrayClass($str.split($pattern, -1));""") + } + + override def prettyName: String = "split" +} + + +/** + * Replace all substrings of str that match regexp with rep. + * + * NOTE: this expression is not THREAD-SAFE, as it has some internal mutable status. + */ +case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expression) + extends TernaryExpression with ImplicitCastInputTypes { + + // last regex in string, we will update the pattern iff regexp value changed. + @transient private var lastRegex: UTF8String = _ + // last regex pattern, we cache it for performance concern + @transient private var pattern: Pattern = _ + // last replacement string, we don't want to convert a UTF8String => java.langString every time. + @transient private var lastReplacement: String = _ + @transient private var lastReplacementInUTF8: UTF8String = _ + // result buffer write by Matcher + @transient private val result: StringBuffer = new StringBuffer + + override def nullSafeEval(s: Any, p: Any, r: Any): Any = { + if (!p.equals(lastRegex)) { + // regex value changed + lastRegex = p.asInstanceOf[UTF8String].clone() + pattern = Pattern.compile(lastRegex.toString) + } + if (!r.equals(lastReplacementInUTF8)) { + // replacement string changed + lastReplacementInUTF8 = r.asInstanceOf[UTF8String].clone() + lastReplacement = lastReplacementInUTF8.toString + } + val m = pattern.matcher(s.toString()) + result.delete(0, result.length()) + + while (m.find) { + m.appendReplacement(result, lastReplacement) + } + m.appendTail(result) + + UTF8String.fromString(result.toString) + } + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType, StringType) + override def children: Seq[Expression] = subject :: regexp :: rep :: Nil + override def prettyName: String = "regexp_replace" + + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val termLastRegex = ctx.freshName("lastRegex") + val termPattern = ctx.freshName("pattern") + + val termLastReplacement = ctx.freshName("lastReplacement") + val termLastReplacementInUTF8 = ctx.freshName("lastReplacementInUTF8") + + val termResult = ctx.freshName("result") + + val classNamePattern = classOf[Pattern].getCanonicalName + val classNameStringBuffer = classOf[java.lang.StringBuffer].getCanonicalName + + ctx.addMutableState("UTF8String", termLastRegex, s"${termLastRegex} = null;") + ctx.addMutableState(classNamePattern, termPattern, s"${termPattern} = null;") + ctx.addMutableState("String", termLastReplacement, s"${termLastReplacement} = null;") + ctx.addMutableState("UTF8String", + termLastReplacementInUTF8, s"${termLastReplacementInUTF8} = null;") + ctx.addMutableState(classNameStringBuffer, + termResult, s"${termResult} = new $classNameStringBuffer();") + + nullSafeCodeGen(ctx, ev, (subject, regexp, rep) => { + s""" + if (!$regexp.equals(${termLastRegex})) { + // regex value changed + ${termLastRegex} = $regexp.clone(); + ${termPattern} = ${classNamePattern}.compile(${termLastRegex}.toString()); + } + if (!$rep.equals(${termLastReplacementInUTF8})) { + // replacement string changed + ${termLastReplacementInUTF8} = $rep.clone(); + ${termLastReplacement} = ${termLastReplacementInUTF8}.toString(); + } + ${termResult}.delete(0, ${termResult}.length()); + java.util.regex.Matcher m = ${termPattern}.matcher($subject.toString()); + + while (m.find()) { + m.appendReplacement(${termResult}, ${termLastReplacement}); + } + m.appendTail(${termResult}); + ${ev.primitive} = UTF8String.fromString(${termResult}.toString()); + ${ev.isNull} = false; + """ + }) + } +} + +/** + * Extract a specific(idx) group identified by a Java regex. + * + * NOTE: this expression is not THREAD-SAFE, as it has some internal mutable status. + */ +case class RegExpExtract(subject: Expression, regexp: Expression, idx: Expression) + extends TernaryExpression with ImplicitCastInputTypes { + def this(s: Expression, r: Expression) = this(s, r, Literal(1)) + + // last regex in string, we will update the pattern iff regexp value changed. + @transient private var lastRegex: UTF8String = _ + // last regex pattern, we cache it for performance concern + @transient private var pattern: Pattern = _ + + override def nullSafeEval(s: Any, p: Any, r: Any): Any = { + if (!p.equals(lastRegex)) { + // regex value changed + lastRegex = p.asInstanceOf[UTF8String].clone() + pattern = Pattern.compile(lastRegex.toString) + } + val m = pattern.matcher(s.toString) + if (m.find) { + val mr: MatchResult = m.toMatchResult + UTF8String.fromString(mr.group(r.asInstanceOf[Int])) + } else { + UTF8String.EMPTY_UTF8 + } + } + + override def dataType: DataType = StringType + override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType, IntegerType) + override def children: Seq[Expression] = subject :: regexp :: idx :: Nil + override def prettyName: String = "regexp_extract" + + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val termLastRegex = ctx.freshName("lastRegex") + val termPattern = ctx.freshName("pattern") + val classNamePattern = classOf[Pattern].getCanonicalName + + ctx.addMutableState("UTF8String", termLastRegex, s"${termLastRegex} = null;") + ctx.addMutableState(classNamePattern, termPattern, s"${termPattern} = null;") + + nullSafeCodeGen(ctx, ev, (subject, regexp, idx) => { + s""" + if (!$regexp.equals(${termLastRegex})) { + // regex value changed + ${termLastRegex} = $regexp.clone(); + ${termPattern} = ${classNamePattern}.compile(${termLastRegex}.toString()); + } + java.util.regex.Matcher m = + ${termPattern}.matcher($subject.toString()); + if (m.find()) { + java.util.regex.MatchResult mr = m.toMatchResult(); + ${ev.primitive} = UTF8String.fromString(mr.group($idx)); + ${ev.isNull} = false; + } else { + ${ev.primitive} = UTF8String.EMPTY_UTF8; + ${ev.isNull} = false; + }""" + }) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala similarity index 74% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index ca044d3e95..3c23f2ecfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -21,13 +21,9 @@ import java.text.DecimalFormat import java.util.Arrays import java.util.{Map => JMap, HashMap} import java.util.Locale -import java.util.regex.{MatchResult, Pattern} - -import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -124,143 +120,6 @@ case class ConcatWs(children: Seq[Expression]) } } - -trait StringRegexExpression extends ImplicitCastInputTypes { - self: BinaryExpression => - - def escape(v: String): String - def matches(regex: Pattern, str: String): Boolean - - override def dataType: DataType = BooleanType - override def inputTypes: Seq[DataType] = Seq(StringType, StringType) - - // try cache the pattern for Literal - private lazy val cache: Pattern = right match { - case x @ Literal(value: String, StringType) => compile(value) - case _ => null - } - - protected def compile(str: String): Pattern = if (str == null) { - null - } else { - // Let it raise exception if couldn't compile the regex string - Pattern.compile(escape(str)) - } - - protected def pattern(str: String) = if (cache == null) compile(str) else cache - - protected override def nullSafeEval(input1: Any, input2: Any): Any = { - val regex = pattern(input2.asInstanceOf[UTF8String].toString()) - if(regex == null) { - null - } else { - matches(regex, input1.asInstanceOf[UTF8String].toString()) - } - } -} - -/** - * Simple RegEx pattern matching function - */ -case class Like(left: Expression, right: Expression) - extends BinaryExpression with StringRegexExpression with CodegenFallback { - - override def escape(v: String): String = StringUtils.escapeLikeRegex(v) - - override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).matches() - - override def toString: String = s"$left LIKE $right" - - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val patternClass = classOf[Pattern].getName - val escapeFunc = StringUtils.getClass.getName.stripSuffix("$") + ".escapeLikeRegex" - val pattern = ctx.freshName("pattern") - - if (right.foldable) { - val rVal = right.eval() - if (rVal != null) { - val regexStr = - StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString())) - ctx.addMutableState(patternClass, pattern, - s"""$pattern = ${patternClass}.compile("$regexStr");""") - - // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. - val eval = left.gen(ctx) - s""" - ${eval.code} - boolean ${ev.isNull} = ${eval.isNull}; - ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; - if (!${ev.isNull}) { - ${ev.primitive} = $pattern.matcher(${eval.primitive}.toString()).matches(); - } - """ - } else { - s""" - boolean ${ev.isNull} = true; - ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; - """ - } - } else { - nullSafeCodeGen(ctx, ev, (eval1, eval2) => { - s""" - String rightStr = ${eval2}.toString(); - ${patternClass} $pattern = ${patternClass}.compile($escapeFunc(rightStr)); - ${ev.primitive} = $pattern.matcher(${eval1}.toString()).matches(); - """ - }) - } - } -} - - -case class RLike(left: Expression, right: Expression) - extends BinaryExpression with StringRegexExpression with CodegenFallback { - - override def escape(v: String): String = v - override def matches(regex: Pattern, str: String): Boolean = regex.matcher(str).find(0) - override def toString: String = s"$left RLIKE $right" - - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val patternClass = classOf[Pattern].getName - val pattern = ctx.freshName("pattern") - - if (right.foldable) { - val rVal = right.eval() - if (rVal != null) { - val regexStr = - StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString()) - ctx.addMutableState(patternClass, pattern, - s"""$pattern = ${patternClass}.compile("$regexStr");""") - - // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. - val eval = left.gen(ctx) - s""" - ${eval.code} - boolean ${ev.isNull} = ${eval.isNull}; - ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; - if (!${ev.isNull}) { - ${ev.primitive} = $pattern.matcher(${eval.primitive}.toString()).find(0); - } - """ - } else { - s""" - boolean ${ev.isNull} = true; - ${ctx.javaType(dataType)} ${ev.primitive} = ${ctx.defaultValue(dataType)}; - """ - } - } else { - nullSafeCodeGen(ctx, ev, (eval1, eval2) => { - s""" - String rightStr = ${eval2}.toString(); - ${patternClass} $pattern = ${patternClass}.compile(rightStr); - ${ev.primitive} = $pattern.matcher(${eval1}.toString()).find(0); - """ - }) - } - } -} - - trait String2StringExpression extends ImplicitCastInputTypes { self: UnaryExpression => @@ -305,7 +164,7 @@ case class Lower(child: Expression) extends UnaryExpression with String2StringEx } /** A base trait for functions that compare two strings, returning a boolean. */ -trait StringComparison extends ImplicitCastInputTypes { +trait StringPredicate extends Predicate with ImplicitCastInputTypes { self: BinaryExpression => def compare(l: UTF8String, r: UTF8String): Boolean @@ -322,7 +181,7 @@ trait StringComparison extends ImplicitCastInputTypes { * A function that returns true if the string `left` contains the string `right`. */ case class Contains(left: Expression, right: Expression) - extends BinaryExpression with Predicate with StringComparison { + extends BinaryExpression with StringPredicate { override def compare(l: UTF8String, r: UTF8String): Boolean = l.contains(r) override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { defineCodeGen(ctx, ev, (c1, c2) => s"($c1).contains($c2)") @@ -333,7 +192,7 @@ case class Contains(left: Expression, right: Expression) * A function that returns true if the string `left` starts with the string `right`. */ case class StartsWith(left: Expression, right: Expression) - extends BinaryExpression with Predicate with StringComparison { + extends BinaryExpression with StringPredicate { override def compare(l: UTF8String, r: UTF8String): Boolean = l.startsWith(r) override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { defineCodeGen(ctx, ev, (c1, c2) => s"($c1).startsWith($c2)") @@ -344,7 +203,7 @@ case class StartsWith(left: Expression, right: Expression) * A function that returns true if the string `left` ends with the string `right`. */ case class EndsWith(left: Expression, right: Expression) - extends BinaryExpression with Predicate with StringComparison { + extends BinaryExpression with StringPredicate { override def compare(l: UTF8String, r: UTF8String): Boolean = l.endsWith(r) override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { defineCodeGen(ctx, ev, (c1, c2) => s"($c1).endsWith($c2)") @@ -769,32 +628,6 @@ case class StringSpace(child: Expression) override def prettyName: String = "space" } -/** - * Splits str around pat (pattern is a regular expression). - */ -case class StringSplit(str: Expression, pattern: Expression) - extends BinaryExpression with ImplicitCastInputTypes { - - override def left: Expression = str - override def right: Expression = pattern - override def dataType: DataType = ArrayType(StringType) - override def inputTypes: Seq[DataType] = Seq(StringType, StringType) - - override def nullSafeEval(string: Any, regex: Any): Any = { - val strings = string.asInstanceOf[UTF8String].split(regex.asInstanceOf[UTF8String], -1) - new GenericArrayData(strings.asInstanceOf[Array[Any]]) - } - - override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val arrayClass = classOf[GenericArrayData].getName - nullSafeCodeGen(ctx, ev, (str, pattern) => - // Array in java is covariant, so we don't need to cast UTF8String[] to Object[]. - s"""${ev.primitive} = new $arrayClass($str.split($pattern, -1));""") - } - - override def prettyName: String = "split" -} - object Substring { def subStringBinarySQL(bytes: Array[Byte], pos: Int, len: Int): Array[Byte] = { if (pos > bytes.length) { @@ -1048,163 +881,6 @@ case class Encode(value: Expression, charset: Expression) } } -/** - * Replace all substrings of str that match regexp with rep. - * - * NOTE: this expression is not THREAD-SAFE, as it has some internal mutable status. - */ -case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expression) - extends TernaryExpression with ImplicitCastInputTypes { - - // last regex in string, we will update the pattern iff regexp value changed. - @transient private var lastRegex: UTF8String = _ - // last regex pattern, we cache it for performance concern - @transient private var pattern: Pattern = _ - // last replacement string, we don't want to convert a UTF8String => java.langString every time. - @transient private var lastReplacement: String = _ - @transient private var lastReplacementInUTF8: UTF8String = _ - // result buffer write by Matcher - @transient private val result: StringBuffer = new StringBuffer - - override def nullSafeEval(s: Any, p: Any, r: Any): Any = { - if (!p.equals(lastRegex)) { - // regex value changed - lastRegex = p.asInstanceOf[UTF8String].clone() - pattern = Pattern.compile(lastRegex.toString) - } - if (!r.equals(lastReplacementInUTF8)) { - // replacement string changed - lastReplacementInUTF8 = r.asInstanceOf[UTF8String].clone() - lastReplacement = lastReplacementInUTF8.toString - } - val m = pattern.matcher(s.toString()) - result.delete(0, result.length()) - - while (m.find) { - m.appendReplacement(result, lastReplacement) - } - m.appendTail(result) - - UTF8String.fromString(result.toString) - } - - override def dataType: DataType = StringType - override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType, StringType) - override def children: Seq[Expression] = subject :: regexp :: rep :: Nil - override def prettyName: String = "regexp_replace" - - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val termLastRegex = ctx.freshName("lastRegex") - val termPattern = ctx.freshName("pattern") - - val termLastReplacement = ctx.freshName("lastReplacement") - val termLastReplacementInUTF8 = ctx.freshName("lastReplacementInUTF8") - - val termResult = ctx.freshName("result") - - val classNamePattern = classOf[Pattern].getCanonicalName - val classNameStringBuffer = classOf[java.lang.StringBuffer].getCanonicalName - - ctx.addMutableState("UTF8String", - termLastRegex, s"${termLastRegex} = null;") - ctx.addMutableState(classNamePattern, - termPattern, s"${termPattern} = null;") - ctx.addMutableState("String", - termLastReplacement, s"${termLastReplacement} = null;") - ctx.addMutableState("UTF8String", - termLastReplacementInUTF8, s"${termLastReplacementInUTF8} = null;") - ctx.addMutableState(classNameStringBuffer, - termResult, s"${termResult} = new $classNameStringBuffer();") - - nullSafeCodeGen(ctx, ev, (subject, regexp, rep) => { - s""" - if (!$regexp.equals(${termLastRegex})) { - // regex value changed - ${termLastRegex} = $regexp.clone(); - ${termPattern} = ${classNamePattern}.compile(${termLastRegex}.toString()); - } - if (!$rep.equals(${termLastReplacementInUTF8})) { - // replacement string changed - ${termLastReplacementInUTF8} = $rep.clone(); - ${termLastReplacement} = ${termLastReplacementInUTF8}.toString(); - } - ${termResult}.delete(0, ${termResult}.length()); - java.util.regex.Matcher m = ${termPattern}.matcher($subject.toString()); - - while (m.find()) { - m.appendReplacement(${termResult}, ${termLastReplacement}); - } - m.appendTail(${termResult}); - ${ev.primitive} = UTF8String.fromString(${termResult}.toString()); - ${ev.isNull} = false; - """ - }) - } -} - -/** - * Extract a specific(idx) group identified by a Java regex. - * - * NOTE: this expression is not THREAD-SAFE, as it has some internal mutable status. - */ -case class RegExpExtract(subject: Expression, regexp: Expression, idx: Expression) - extends TernaryExpression with ImplicitCastInputTypes { - def this(s: Expression, r: Expression) = this(s, r, Literal(1)) - - // last regex in string, we will update the pattern iff regexp value changed. - @transient private var lastRegex: UTF8String = _ - // last regex pattern, we cache it for performance concern - @transient private var pattern: Pattern = _ - - override def nullSafeEval(s: Any, p: Any, r: Any): Any = { - if (!p.equals(lastRegex)) { - // regex value changed - lastRegex = p.asInstanceOf[UTF8String].clone() - pattern = Pattern.compile(lastRegex.toString) - } - val m = pattern.matcher(s.toString()) - if (m.find) { - val mr: MatchResult = m.toMatchResult - UTF8String.fromString(mr.group(r.asInstanceOf[Int])) - } else { - UTF8String.EMPTY_UTF8 - } - } - - override def dataType: DataType = StringType - override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType, IntegerType) - override def children: Seq[Expression] = subject :: regexp :: idx :: Nil - override def prettyName: String = "regexp_extract" - - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { - val termLastRegex = ctx.freshName("lastRegex") - val termPattern = ctx.freshName("pattern") - val classNamePattern = classOf[Pattern].getCanonicalName - - ctx.addMutableState("UTF8String", termLastRegex, s"${termLastRegex} = null;") - ctx.addMutableState(classNamePattern, termPattern, s"${termPattern} = null;") - - nullSafeCodeGen(ctx, ev, (subject, regexp, idx) => { - s""" - if (!$regexp.equals(${termLastRegex})) { - // regex value changed - ${termLastRegex} = $regexp.clone(); - ${termPattern} = ${classNamePattern}.compile(${termLastRegex}.toString()); - } - java.util.regex.Matcher m = - ${termPattern}.matcher($subject.toString()); - if (m.find()) { - java.util.regex.MatchResult mr = m.toMatchResult(); - ${ev.primitive} = UTF8String.fromString(mr.group($idx)); - ${ev.isNull} = false; - } else { - ${ev.primitive} = UTF8String.EMPTY_UTF8; - ${ev.isNull} = false; - }""" - }) - } -} - /** * Formats the number X to a format like '#,###,###.##', rounded to D decimal places, * and returns the result as a string. If D is 0, the result has no decimal point or diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 42457d5318..854463dd11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -372,7 +372,7 @@ object NullPropagation extends Rule[LogicalPlan] { case _ => e } - case e: StringComparison => e.children match { + case e: StringPredicate => e.children match { case Literal(null, _) :: right :: Nil => Literal.create(null, e.dataType) case left :: Literal(null, _) :: Nil => Literal.create(null, e.dataType) case _ => e diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index 426dc27247..99e3b13ce8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -673,7 +673,7 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Length(Literal.create(null, BinaryType)), null, create_row(bytes)) } - test("number format") { + test("format_number / FormatNumber") { checkEvaluation(FormatNumber(Literal(4.asInstanceOf[Byte]), Literal(3)), "4.000") checkEvaluation(FormatNumber(Literal(4.asInstanceOf[Short]), Literal(3)), "4.000") checkEvaluation(FormatNumber(Literal(4.0f), Literal(3)), "4.000") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 1f270560d7..fc4d0938c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -56,6 +56,7 @@ class UDFRegistration private[sql] (sqlContext: SQLContext) extends Logging { /** * Register a user-defined aggregate function (UDAF). + * * @param name the name of the UDAF. * @param udaf the UDAF needs to be registered. * @return the registered UDAF. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index 7619f3ec9f..d43d3dd9ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -304,7 +304,7 @@ private[sql] case class ScalaUDAF( override def nullable: Boolean = true - override def dataType: DataType = udaf.returnDataType + override def dataType: DataType = udaf.dataType override def deterministic: Boolean = udaf.deterministic diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala index 5180871585..258afadc76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.expressions -import org.apache.spark.sql.catalyst.expressions.ScalaUDF import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2} import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.{Column, Row} @@ -26,7 +25,7 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: - * The abstract class for implementing user-defined aggregate functions. + * The base class for implementing user-defined aggregate functions (UDAF). */ @Experimental abstract class UserDefinedAggregateFunction extends Serializable { @@ -67,22 +66,35 @@ abstract class UserDefinedAggregateFunction extends Serializable { /** * The [[DataType]] of the returned value of this [[UserDefinedAggregateFunction]]. */ - def returnDataType: DataType + def dataType: DataType - /** Indicates if this function is deterministic. */ + /** + * Returns true iff this function is deterministic, i.e. given the same input, + * always return the same output. + */ def deterministic: Boolean /** - * Initializes the given aggregation buffer. Initial values set by this method should satisfy - * the condition that when merging two buffers with initial values, the new buffer - * still store initial values. + * Initializes the given aggregation buffer, i.e. the zero value of the aggregation buffer. + * + * The contract should be that applying the merge function on two initial buffers should just + * return the initial buffer itself, i.e. + * `merge(initialBuffer, initialBuffer)` should equal `initialBuffer`. */ def initialize(buffer: MutableAggregationBuffer): Unit - /** Updates the given aggregation buffer `buffer` with new input data from `input`. */ + /** + * Updates the given aggregation buffer `buffer` with new input data from `input`. + * + * This is called once per input row. + */ def update(buffer: MutableAggregationBuffer, input: Row): Unit - /** Merges two aggregation buffers and stores the updated buffer values back to `buffer1`. */ + /** + * Merges two aggregation buffers and stores the updated buffer values back to `buffer1`. + * + * This is called when we merge two partially aggregated data together. + */ def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit /** @@ -92,7 +104,7 @@ abstract class UserDefinedAggregateFunction extends Serializable { def evaluate(buffer: Row): Any /** - * Creates a [[Column]] for this UDAF with given [[Column]]s as arguments. + * Creates a [[Column]] for this UDAF using given [[Column]]s as input arguments. */ @scala.annotation.varargs def apply(exprs: Column*): Column = { @@ -105,16 +117,16 @@ abstract class UserDefinedAggregateFunction extends Serializable { } /** - * Creates a [[Column]] for this UDAF with given [[Column]]s as arguments. - * If `isDistinct` is true, this UDAF is working on distinct input values. + * Creates a [[Column]] for this UDAF using the distinct values of the given + * [[Column]]s as input arguments. */ @scala.annotation.varargs - def apply(isDistinct: Boolean, exprs: Column*): Column = { + def distinct(exprs: Column*): Column = { val aggregateExpression = AggregateExpression2( ScalaUDAF(exprs.map(_.expr), this), Complete, - isDistinct = isDistinct) + isDistinct = true) Column(aggregateExpression) } } @@ -122,9 +134,11 @@ abstract class UserDefinedAggregateFunction extends Serializable { /** * :: Experimental :: * A [[Row]] representing an mutable aggregation buffer. + * + * This is not meant to be extended outside of Spark. */ @Experimental -trait MutableAggregationBuffer extends Row { +abstract class MutableAggregationBuffer extends Row { /** Update the ith value of this buffer. */ def update(i: Int, value: Any): Unit diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java index 21b053f07a..a30dfa554e 100644 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java +++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/JavaDataFrameSuite.java @@ -92,7 +92,7 @@ public class JavaDataFrameSuite { DataFrame aggregatedDF = df.groupBy() .agg( - udaf.apply(true, col("value")), + udaf.distinct(col("value")), udaf.apply(col("value")), registeredUDAF.apply(col("value")), callUDF("mydoublesum", col("value"))); diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java index a2247e3da1..2961b803f1 100644 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java +++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleAvg.java @@ -65,7 +65,7 @@ public class MyDoubleAvg extends UserDefinedAggregateFunction { return _bufferSchema; } - @Override public DataType returnDataType() { + @Override public DataType dataType() { return _returnDataType; } diff --git a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java index da29e24d26..c71882a6e7 100644 --- a/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java +++ b/sql/hive/src/test/java/test/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java @@ -60,7 +60,7 @@ public class MyDoubleSum extends UserDefinedAggregateFunction { return _bufferSchema; } - @Override public DataType returnDataType() { + @Override public DataType dataType() { return _returnDataType; } -- GitLab