Skip to content
Snippets Groups Projects
Commit 39f6f755 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Some clean-up of tests

parent 9423532f
No related branches found
No related tags found
No related merge requests found
...@@ -54,8 +54,7 @@ trait JavaTestBase extends TestSuiteBase { ...@@ -54,8 +54,7 @@ trait JavaTestBase extends TestSuiteBase {
{ {
implicit val cm: ClassManifest[T] = implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream, val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
dstream.dstream.ssc.registerOutputStream(ostream) dstream.dstream.ssc.registerOutputStream(ostream)
} }
......
...@@ -366,7 +366,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ...@@ -366,7 +366,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("Manual clock after advancing = " + clock.time) logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds)
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
outputStream.output outputStream.output.map(_.flatten)
} }
} }
...@@ -63,7 +63,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ ...@@ -63,7 +63,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* *
* The buffer contains a sequence of RDD's, each containing a sequence of items * The buffer contains a sequence of RDD's, each containing a sequence of items
*/ */
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) class TestOutputStream[T: ClassManifest](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect() val collected = rdd.collect()
output += collected output += collected
...@@ -82,9 +83,10 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu ...@@ -82,9 +83,10 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
* *
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequnce of items. * containing a sequence of items.
*/ */
class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]]) class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq) val collected = rdd.glom().collect().map(_.toSeq)
output += collected output += collected
...@@ -96,6 +98,8 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val o ...@@ -96,6 +98,8 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val o
ois.defaultReadObject() ois.defaultReadObject()
output.clear() output.clear()
} }
def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment