diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 780f7b823b1018dc1db5e5667b3439496a6e34a3..5e384eeee45f385ff83bba34f93c4b2228f5faaa 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -33,9 +33,9 @@ trait JavaTestBase extends TestSuiteBase { * The stream will be derived from the supplied lists of Java objects. **/ def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) implicit val cm: ClassManifest[T] = @@ -50,7 +50,7 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] @@ -66,7 +66,7 @@ trait JavaTestBase extends TestSuiteBase { * Returns a list of items for each RDD. */ def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { implicit val cm: ClassManifest[V] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) @@ -83,16 +83,16 @@ trait JavaTestBase extends TestSuiteBase { * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. */ - def runStreamsWithPartitions[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { + def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, + numExpectedOutput: Int): JList[JList[JList[V]]] = { implicit val cm: ClassManifest[V] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[JList[V]]]() - res.map(entry => { + res.map{entry => val lists = entry.map(new ArrayList[V](_)) out.append(new ArrayList[JList[V]](lists)) - }) + } out } }