Skip to content
Snippets Groups Projects
Commit 8ea2cd56 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding fix covering combineCombinersByKey as well

parent b683608c
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,8 @@
package org.apache.spark
import scala.{Option, deprecated}
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
/**
......@@ -35,10 +37,11 @@ case class Aggregator[K, V, C] (
private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) = combineValuesByKey(iter, null)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext) : Iterator[(K, C)] = {
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
......@@ -63,6 +66,10 @@ case class Aggregator[K, V, C] (
}
}
@deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] =
combineCombinersByKey(iter, null)
def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = {
if (!externalSorting) {
val combiners = new AppendOnlyMap[K,C]
......@@ -81,8 +88,9 @@ case class Aggregator[K, V, C] (
val (k, c) = iter.next()
combiners.insert(k, c)
}
context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
// TODO: Make this non optional in a future release
Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
combiners.iterator
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment