diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 29ae47e842ddb46117686d499b377144ccf5bca3..3f72e6e184db1a442d84e6afd5fc43d29aaabdc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -184,10 +184,10 @@ abstract class Expression extends TreeNode[Expression] { */ trait Unevaluable extends Expression { - override def eval(input: InternalRow = null): Any = + final override def eval(input: InternalRow = null): Any = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") - override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = + final override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = throw new UnsupportedOperationException(s"Cannot evaluate expression: $this") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 577ede73cb01ff68c34d690f04c7bae8c24bd373..d3fee1ade05e62aac5af760cb0f6546c35cfc20d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -63,10 +63,6 @@ private[sql] case object Complete extends AggregateMode */ private[sql] case object NoOp extends Expression with Unevaluable { override def nullable: Boolean = true - override def eval(input: InternalRow): Any = { - throw new TreeNodeException( - this, s"No function to evaluate expression. type: ${this.nodeName}") - } override def dataType: DataType = NullType override def children: Seq[Expression] = Nil } @@ -151,8 +147,7 @@ abstract class AggregateFunction2 /** * A helper class for aggregate functions that can be implemented in terms of catalyst expressions. */ -abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable { - self: Product => +abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable { val initialValues: Seq[Expression] val updateExpressions: Seq[Expression] @@ -188,19 +183,15 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable { } } - override def update(buffer: MutableRow, input: InternalRow): Unit = { + override final def update(buffer: MutableRow, input: InternalRow): Unit = { throw new UnsupportedOperationException( "AlgebraicAggregate's update should not be called directly") } - override def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { + override final def merge(buffer1: MutableRow, buffer2: InternalRow): Unit = { throw new UnsupportedOperationException( "AlgebraicAggregate's merge should not be called directly") } - override def eval(buffer: InternalRow): Any = { - throw new UnsupportedOperationException( - "AlgebraicAggregate's eval should not be called directly") - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index e07c920a41d0ace78a286e257fc7a1454cbc2be7..d3295b8bafa804439a91f114c889b60b6b5bbeee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet @@ -71,8 +71,7 @@ trait PartialAggregate1 extends AggregateExpression1 { * A specific implementation of an aggregate function. Used to wrap a generic * [[AggregateExpression1]] with an algorithm that will be used to compute one specific result. */ -abstract class AggregateFunction1 - extends LeafExpression with AggregateExpression1 with Serializable { +abstract class AggregateFunction1 extends LeafExpression with Serializable { /** Base should return the generic aggregate expression that this function is computing */ val base: AggregateExpression1 @@ -82,9 +81,9 @@ abstract class AggregateFunction1 def update(input: InternalRow): Unit - // Do we really need this? - override def newInstance(): AggregateFunction1 = { - makeCopy(productIterator.map { case a: AnyRef => a }.toArray) + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + throw new UnsupportedOperationException( + "AggregateFunction1 should not be used for generated aggregates") } }