diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index e3a9247924f12c2f771533fbeb54158753cd85c2..e125310861ebcd016dabbd3f25e29f9d95b1fbb7 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -441,10 +441,7 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Long] = { - val zero = new ConstantInputDStream(context, context.sparkContext.makeRDD(Seq((null, 0L)), 1)) - this.map(_ => (null, 1L)).union(zero).reduceByKey(_ + _).map(_._2) - } + def count(): DStream[Long] = this.map(_ => (null, 1L)).transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))).reduceByKey(_ + _).map(_._2) /** * Return a new DStream in which each RDD contains the counts of each distinct value in