Skip to content
Snippets Groups Projects
Commit 8a538c97 authored by Ergin Seyfe's avatar Ergin Seyfe Committed by Reynold Xin
Browse files

[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues like

```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe <eseyfe@fb.com>

Closes #15706 from seyfe/keyvaluegrouped_serialization.
parent 8cdf143f
No related branches found
No related tags found
No related merge requests found
......@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("AssertionError", output)
assertDoesNotContain("Exception", output)
}
test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
val resultValue = 12345
val output = runInterpreter("local",
s"""
|val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
|val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
|val broadcasted = sc.broadcast($resultValue)
|
|// Using broadcast triggers serialization issue in KeyValueGroupedDataset
|val dataset = mapGroups.map(_ => broadcasted.value)
|dataset.collect()
""".stripMargin)
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
}
}
......@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
class KeyValueGroupedDataset[K, V] private[sql](
kEncoder: Encoder[K],
vEncoder: Encoder[V],
val queryExecution: QueryExecution,
@transient val queryExecution: QueryExecution,
private val dataAttributes: Seq[Attribute],
private val groupingAttributes: Seq[Attribute]) extends Serializable {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment