From 50e234ba510acac0f75c080b1b1ea681a3a28449 Mon Sep 17 00:00:00 2001
From: Takuya UESHIN <ueshin@happy-camper.st>
Date: Tue, 27 May 2014 14:55:23 -0700
Subject: [PATCH] [SPARK-1915] [SQL] AverageFunction should not count if the
 evaluated value is null.

Average values are difference between the calculation is done partially or not partially.
Because `AverageFunction` (in not-partially calculation) counts even if the evaluated value is null.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #862 from ueshin/issues/SPARK-1915 and squashes the following commits:

b1ff3c0 [Takuya UESHIN] Modify AverageFunction not to count if the evaluated value is null.

(cherry picked from commit 3b0babad1f0856ee16f9d58e1ead30779a4a6310)
Signed-off-by: Reynold Xin <rxin@apache.org>
---
 .../spark/sql/catalyst/expressions/aggregates.scala    |  9 ++++++---
 .../scala/org/apache/spark/sql/DslQuerySuite.scala     | 10 ++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index b49a4614ea..c902433688 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -281,14 +281,17 @@ case class AverageFunction(expr: Expression, base: AggregateExpression)
   private val sum = MutableLiteral(zero.eval(EmptyRow))
   private val sumAsDouble = Cast(sum, DoubleType)
 
-  private val addFunction = Add(sum, Coalesce(Seq(expr, zero)))
+  private def addFunction(value: Any) = Add(sum, Literal(value))
 
   override def eval(input: Row): Any =
     sumAsDouble.eval(EmptyRow).asInstanceOf[Double] / count.toDouble
 
   override def update(input: Row): Unit = {
-    count += 1
-    sum.update(addFunction, input)
+    val evaluatedExpr = expr.eval(input)
+    if (evaluatedExpr != null) {
+      count += 1
+      sum.update(addFunction(evaluatedExpr), input)
+    }
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 8197e8a18d..fb599e1e01 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -115,6 +115,16 @@ class DslQuerySuite extends QueryTest {
       2.0)
   }
 
+  test("null average") {
+    checkAnswer(
+      testData3.groupBy()(Average('b)),
+      2.0)
+
+    checkAnswer(
+      testData3.groupBy()(Average('b), CountDistinct('b :: Nil)),
+      (2.0, 1) :: Nil)
+  }
+
   test("count") {
     assert(testData2.count() === testData2.map(_ => 1).count())
   }
-- 
GitLab