From b95c1bdbbaeea86152e24b394a03bbbad95989d5 Mon Sep 17 00:00:00 2001
From: seanm <sean.mcnamara@webtrends.com>
Date: Fri, 10 May 2013 12:47:24 -0600
Subject: [PATCH] count() now uses a transform instead of ConstantInputDStream

---
 streaming/src/main/scala/spark/streaming/DStream.scala | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index e3a9247924..e125310861 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -441,10 +441,7 @@ abstract class DStream[T: ClassManifest] (
    * Return a new DStream in which each RDD has a single element generated by counting each RDD
    * of this DStream.
    */
-  def count(): DStream[Long] = {
-   val zero = new ConstantInputDStream(context, context.sparkContext.makeRDD(Seq((null, 0L)), 1))
-   this.map(_ => (null, 1L)).union(zero).reduceByKey(_ + _).map(_._2)
-  }
+  def count(): DStream[Long] = this.map(_ => (null, 1L)).transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))).reduceByKey(_ + _).map(_._2)
 
   /**
    * Return a new DStream in which each RDD contains the counts of each distinct value in
-- 
GitLab