From 2a28bc61009a170af3853c78f7f36970898a6d56 Mon Sep 17 00:00:00 2001
From: Sean Owen <sowen@cloudera.com>
Date: Mon, 15 Dec 2014 16:06:15 -0800
Subject: [PATCH] SPARK-785 [CORE] ClosureCleaner not invoked on most
 PairRDDFunctions

This looked like perhaps a simple and important one. `combineByKey` looks like it should clean its arguments' closures, and that in turn covers apparently all remaining functions in `PairRDDFunctions` which delegate to it.

Author: Sean Owen <sowen@cloudera.com>

Closes #3690 from srowen/SPARK-785 and squashes the following commits:

8df68fe [Sean Owen] Clean context of most remaining functions in PairRDDFunctions, which ultimately call combineByKey
---
 .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c43e1f2fe1..b0434c9a3b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -84,7 +84,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
         throw new SparkException("Default partitioner cannot partition array keys.")
       }
     }
-    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+    val aggregator = new Aggregator[K, V, C](
+      self.context.clean(createCombiner),
+      self.context.clean(mergeValue),
+      self.context.clean(mergeCombiners))
     if (self.partitioner == Some(partitioner)) {
       self.mapPartitions(iter => {
         val context = TaskContext.get()
-- 
GitLab