From 39f6f75588b69f07cd963c5e211045fed103695b Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Thu, 24 Oct 2013 16:43:33 -0700 Subject: [PATCH] Some clean-up of tests --- .../org/apache/spark/streaming/JavaTestUtils.scala | 3 +-- .../org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- .../org/apache/spark/streaming/TestSuiteBase.scala | 10 +++++++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 8a6604904d..5344ae7682 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -54,8 +54,7 @@ trait JavaTestBase extends TestSuiteBase { { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index a327de80b3..beb20831bd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -366,7 +366,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] - outputStream.output + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] + outputStream.output.map(_.flatten) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 26f515a778..be140699c2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -63,7 +63,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ * * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) +class TestOutputStream[T: ClassManifest](parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected @@ -82,9 +83,10 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. * * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each - * containing a sequnce of items. + * containing a sequence of items. */ -class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]]) +class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], + val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) output += collected @@ -96,6 +98,8 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val o ois.defaultReadObject() output.clear() } + + def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten)) } /** -- GitLab