Skip to content
Snippets Groups Projects
Commit b95c1bdb authored by seanm's avatar seanm
Browse files

count() now uses a transform instead of ConstantInputDStream

parent b42d68c8
No related branches found
No related tags found
No related merge requests found
...@@ -441,10 +441,7 @@ abstract class DStream[T: ClassManifest] ( ...@@ -441,10 +441,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD has a single element generated by counting each RDD * Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream. * of this DStream.
*/ */
def count(): DStream[Long] = { def count(): DStream[Long] = this.map(_ => (null, 1L)).transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))).reduceByKey(_ + _).map(_._2)
val zero = new ConstantInputDStream(context, context.sparkContext.makeRDD(Seq((null, 0L)), 1))
this.map(_ => (null, 1L)).union(zero).reduceByKey(_ + _).map(_._2)
}
/** /**
* Return a new DStream in which each RDD contains the counts of each distinct value in * Return a new DStream in which each RDD contains the counts of each distinct value in
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment