From b683608c9f5db0b320c91c4d027fff2e8ce3bd04 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Tue, 14 Jan 2014 12:15:10 -0800
Subject: [PATCH] Deprecate rather than remove old combineValuesByKey function

---
 core/src/main/scala/org/apache/spark/Aggregator.scala | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 6d439fdc68..d712927a21 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -34,6 +34,9 @@ case class Aggregator[K, V, C] (
   private val sparkConf = SparkEnv.get.conf
   private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
 
+  @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
+  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) = combineValuesByKey(iter, null)
+
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
                          context: TaskContext) : Iterator[(K, C)] = {
     if (!externalSorting) {
@@ -53,8 +56,9 @@ case class Aggregator[K, V, C] (
         val (k, v) = iter.next()
         combiners.insert(k, v)
       }
-      context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled
-      context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled
+      // TODO: Make this non optional in a future release
+      Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled)
+      Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled)
       combiners.iterator
     }
   }
-- 
GitLab