diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 9262e938c2a6004e65316818f95bb9882588961f..96d2dfc2658b9966fc9dfc1478b0bc8377a00d1e 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("AssertionError", output) assertDoesNotContain("Exception", output) } + + test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") { + val resultValue = 12345 + val output = runInterpreter("local", + s""" + |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1) + |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1)) + |val broadcasted = sc.broadcast($resultValue) + | + |// Using broadcast triggers serialization issue in KeyValueGroupedDataset + |val dataset = mapGroups.map(_ => broadcasted.value) + |dataset.collect() + """.stripMargin) + assertDoesNotContain("error:", output) + assertDoesNotContain("Exception", output) + assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 4cb0313aa90374d08cb1e615ba5fbec5df159f03..31ce8eb25e80877d15059d554156920fbd323e6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator class KeyValueGroupedDataset[K, V] private[sql]( kEncoder: Encoder[K], vEncoder: Encoder[V], - val queryExecution: QueryExecution, + @transient val queryExecution: QueryExecution, private val dataAttributes: Seq[Attribute], private val groupingAttributes: Seq[Attribute]) extends Serializable {