Skip to content
Snippets Groups Projects
Commit 575aff6b authored by Tathagata Das's avatar Tathagata Das
Browse files

Merge pull request #567 from Reinvigorate/sm-count-fix

Fixing count() in Spark Streaming
parents 8ac9efba b95c1bdb
No related branches found
No related tags found
No related merge requests found
......@@ -441,7 +441,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
def count(): DStream[Long] = this.map(_ => (null, 1L)).transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))).reduceByKey(_ + _).map(_._2)
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
......
......@@ -90,9 +90,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("count") {
testOperation(
Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
Seq(Seq(), 1 to 1, 1 to 2, 1 to 3, 1 to 4),
(s: DStream[Int]) => s.count(),
Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
Seq(Seq(0L), Seq(1L), Seq(2L), Seq(3L), Seq(4L))
)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment