From 8a538c97b556f80f67c80519af0ce879557050d5 Mon Sep 17 00:00:00 2001
From: Ergin Seyfe <eseyfe@fb.com>
Date: Tue, 1 Nov 2016 11:18:42 -0700
Subject: [PATCH] [SPARK-18189][SQL] Fix serialization issue in
 KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues like

```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe <eseyfe@fb.com>

Closes #15706 from seyfe/keyvaluegrouped_serialization.
---
 .../scala/org/apache/spark/repl/ReplSuite.scala | 17 +++++++++++++++++
 .../spark/sql/KeyValueGroupedDataset.scala      |  2 +-
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e938c2..96d2dfc265 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
     assertDoesNotContain("AssertionError", output)
     assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+    val resultValue = 12345
+    val output = runInterpreter("local",
+      s"""
+         |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+         |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+         |val broadcasted = sc.broadcast($resultValue)
+         |
+         |// Using broadcast triggers serialization issue in KeyValueGroupedDataset
+         |val dataset = mapGroups.map(_ => broadcasted.value)
+         |dataset.collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 4cb0313aa9..31ce8eb25e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
 class KeyValueGroupedDataset[K, V] private[sql](
     kEncoder: Encoder[K],
     vEncoder: Encoder[V],
-    val queryExecution: QueryExecution,
+    @transient val queryExecution: QueryExecution,
     private val dataAttributes: Seq[Attribute],
     private val groupingAttributes: Seq[Attribute]) extends Serializable {
 
-- 
GitLab