Skip to content
Snippets Groups Projects
Commit b2d8c022 authored by Sean Owen's avatar Sean Owen Committed by Reynold Xin
Browse files

SPARK-6044 [CORE] RDD.aggregate() should not use the closure serializer on the zero value

Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer.

Compare with https://github.com/apache/spark/blob/e60ad2f4c47b011be7a3198689ac2b82ee317d96/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L127

Author: Sean Owen <sowen@cloudera.com>

Closes #5028 from srowen/SPARK-6044 and squashes the following commits:

a4040a7 [Sean Owen] Use configured serializer in RDD.aggregate, to match PairRDDFunctions.aggregateByKey, instead of closure serializer
parent b3e6eca8
No related branches found
No related tags found
No related merge requests found
......@@ -960,7 +960,7 @@ abstract class RDD[T: ClassTag](
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment