From 1a0bc41659eef317dcac18df35c26857216a4314 Mon Sep 17 00:00:00 2001
From: DB Tsai <dbt@netflix.com>
Date: Mon, 10 Apr 2017 05:16:34 +0000
Subject: [PATCH] [SPARK-20270][SQL] na.fill should not change the values in
 long or integer when the default value is in double

## What changes were proposed in this pull request?

This bug was partially addressed in SPARK-18555 https://github.com/apache/spark/pull/15994, but the root cause isn't completely solved. This bug is pretty critical since it changes the member id in Long in our application if the member id can not be represented by Double losslessly when the member id is very big.

Here is an example how this happens, with
```
      Seq[(java.lang.Long, java.lang.Double)]((null, 3.14), (9123146099426677101L, null),
        (9123146560113991650L, 1.6), (null, null)).toDF("a", "b").na.fill(0.2),
```
the logical plan will be
```
== Analyzed Logical Plan ==
a: bigint, b: double
Project [cast(coalesce(cast(a#232L as double), cast(0.2 as double)) as bigint) AS a#240L, cast(coalesce(nanvl(b#233, cast(null as double)), 0.2) as double) AS b#241]
+- Project [_1#229L AS a#232L, _2#230 AS b#233]
   +- LocalRelation [_1#229L, _2#230]
```

Note that even the value is not null, Spark will cast the Long into Double first. Then if it's not null, Spark will cast it back to Long which results in losing precision.

The behavior should be that the original value should not be changed if it's not null, but Spark will change the value which is wrong.

With the PR, the logical plan will be
```
== Analyzed Logical Plan ==
a: bigint, b: double
Project [coalesce(a#232L, cast(0.2 as bigint)) AS a#240L, coalesce(nanvl(b#233, cast(null as double)), cast(0.2 as double)) AS b#241]
+- Project [_1#229L AS a#232L, _2#230 AS b#233]
   +- LocalRelation [_1#229L, _2#230]
```
which behaves correctly without changing the original Long values and also avoids extra cost of unnecessary casting.

## How was this patch tested?

unit test added.

+cc srowen rxin cloud-fan gatorsmile

Thanks.

Author: DB Tsai <dbt@netflix.com>

Closes #17577 from dbtsai/fixnafill.
---
 .../apache/spark/sql/DataFrameNaFunctions.scala    |  5 +++--
 .../spark/sql/DataFrameNaFunctionsSuite.scala      | 14 ++++++++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index 28820681cd..d8f953fba5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -407,10 +407,11 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
     val quotedColName = "`" + col.name + "`"
     val colValue = col.dataType match {
       case DoubleType | FloatType =>
-        nanvl(df.col(quotedColName), lit(null)) // nanvl only supports these types
+        // nanvl only supports these types
+        nanvl(df.col(quotedColName), lit(null).cast(col.dataType))
       case _ => df.col(quotedColName)
     }
-    coalesce(colValue, lit(replacement)).cast(col.dataType).as(col.name)
+    coalesce(colValue, lit(replacement).cast(col.dataType)).as(col.name)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
index fd829846ac..aa237d0619 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala
@@ -145,6 +145,20 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext {
       Row(1, 2) :: Row(-1, -2) :: Row(9123146099426677101L, 9123146560113991650L) :: Nil
     )
 
+    checkAnswer(
+      Seq[(java.lang.Long, java.lang.Double)]((null, 3.14), (9123146099426677101L, null),
+        (9123146560113991650L, 1.6), (null, null)).toDF("a", "b").na.fill(0.2),
+      Row(0, 3.14) :: Row(9123146099426677101L, 0.2) :: Row(9123146560113991650L, 1.6)
+        :: Row(0, 0.2) :: Nil
+    )
+
+    checkAnswer(
+      Seq[(java.lang.Long, java.lang.Float)]((null, 3.14f), (9123146099426677101L, null),
+        (9123146560113991650L, 1.6f), (null, null)).toDF("a", "b").na.fill(0.2),
+      Row(0, 3.14f) :: Row(9123146099426677101L, 0.2f) :: Row(9123146560113991650L, 1.6f)
+        :: Row(0, 0.2f) :: Nil
+    )
+
     checkAnswer(
       Seq[(java.lang.Long, java.lang.Double)]((null, 1.23), (3L, null), (4L, 3.45))
         .toDF("a", "b").na.fill(2.34),
-- 
GitLab