diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 6d439fdc684afaae595ad34e6d521f923633c9e7..d712927a21514db095765c3868fbef5ba9ea22e6 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -34,6 +34,9 @@ case class Aggregator[K, V, C] ( private val sparkConf = SparkEnv.get.conf private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true) + @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) = combineValuesByKey(iter, null) + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { @@ -53,8 +56,9 @@ case class Aggregator[K, V, C] ( val (k, v) = iter.next() combiners.insert(k, v) } - context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } }