From acb0323053d270a377e497e975b2dfe59e2f997c Mon Sep 17 00:00:00 2001
From: Hossein Falaki <falaki@gmail.com>
Date: Tue, 31 Dec 2013 15:34:26 -0800
Subject: [PATCH] minor improvements

---
 .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala   | 5 ++---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala           | 4 +++-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 1dc5f8d2f5..088b298aad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -229,9 +229,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     }
     val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
 
-    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map {
-      case (k, v) => (k, v.value.cardinality())
-    }
+    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
+
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 74fab48619..161fd067e1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -809,7 +809,9 @@ abstract class RDD[T: ClassTag](
     }
     def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog) = c1.merge(c2)
 
-    mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
+    val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+    mapPartitions(hllCountPartition).aggregate(zeroCounter)(mergeCounters, mergeCounters)
+      .value.cardinality()
   }
 
   /**
-- 
GitLab