Skip to content
Snippets Groups Projects
Commit ad5f579c authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Style fixes

parent e5f6d569
No related branches found
No related tags found
No related merge requests found
...@@ -33,9 +33,9 @@ trait JavaTestBase extends TestSuiteBase { ...@@ -33,9 +33,9 @@ trait JavaTestBase extends TestSuiteBase {
* The stream will be derived from the supplied lists of Java objects. * The stream will be derived from the supplied lists of Java objects.
**/ **/
def attachTestInputStream[T]( def attachTestInputStream[T](
ssc: JavaStreamingContext, ssc: JavaStreamingContext,
data: JList[JList[T]], data: JList[JList[T]],
numPartitions: Int) = { numPartitions: Int) = {
val seqData = data.map(Seq(_:_*)) val seqData = data.map(Seq(_:_*))
implicit val cm: ClassManifest[T] = implicit val cm: ClassManifest[T] =
...@@ -50,7 +50,7 @@ trait JavaTestBase extends TestSuiteBase { ...@@ -50,7 +50,7 @@ trait JavaTestBase extends TestSuiteBase {
* [[org.apache.spark.streaming.TestOutputStream]]. * [[org.apache.spark.streaming.TestOutputStream]].
**/ **/
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) = dstream: JavaDStreamLike[T, This, R]) =
{ {
implicit val cm: ClassManifest[T] = implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
...@@ -66,7 +66,7 @@ trait JavaTestBase extends TestSuiteBase { ...@@ -66,7 +66,7 @@ trait JavaTestBase extends TestSuiteBase {
* Returns a list of items for each RDD. * Returns a list of items for each RDD.
*/ */
def runStreams[V]( def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassManifest[V] = implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
...@@ -83,16 +83,16 @@ trait JavaTestBase extends TestSuiteBase { ...@@ -83,16 +83,16 @@ trait JavaTestBase extends TestSuiteBase {
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition. * representing one partition.
*/ */
def runStreamsWithPartitions[V]( def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { numExpectedOutput: Int): JList[JList[JList[V]]] = {
implicit val cm: ClassManifest[V] = implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[JList[V]]]() val out = new ArrayList[JList[JList[V]]]()
res.map(entry => { res.map{entry =>
val lists = entry.map(new ArrayList[V](_)) val lists = entry.map(new ArrayList[V](_))
out.append(new ArrayList[JList[V]](lists)) out.append(new ArrayList[JList[V]](lists))
}) }
out out
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment