diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c43e1f2fe135e5a5eaa559b398b590c68f5e4f39..b0434c9a3b6b3cb0ed2da26449ae06ec1220bf2c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -84,7 +84,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
         throw new SparkException("Default partitioner cannot partition array keys.")
       }
     }
-    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+    val aggregator = new Aggregator[K, V, C](
+      self.context.clean(createCombiner),
+      self.context.clean(mergeValue),
+      self.context.clean(mergeCombiners))
     if (self.partitioner == Some(partitioner)) {
       self.mapPartitions(iter => {
         val context = TaskContext.get()