Skip to content
Snippets Groups Projects
Commit bb4102b0 authored by Prashant Sharma's avatar Prashant Sharma
Browse files

Fixed breaking tests in streaming checkpoint suite. Changed RichInt to Int as...

Fixed breaking tests in streaming checkpoint suite. Changed RichInt to Int as it is final and not serializable
parent ad88f083
No related branches found
No related tags found
No related merge requests found
package spark.streaming
import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
import org.scalatest.BeforeAndAfter
import scala.collection.mutable.ArrayBuffer
import org.apache.commons.io.FileUtils
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.{Clock, ManualClock}
import scala.util.Random
import org.scalatest.BeforeAndAfter
import com.google.common.io.Files
import spark.streaming.StreamingContext.toPairDStreamFunctions
import spark.streaming.dstream.FileInputDStream
import spark.streaming.util.ManualClock
/**
* This test suites tests the checkpointing functionality of DStreams -
......@@ -56,13 +59,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Setup the streams
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey[RichInt](updateFunc)
.updateStateByKey(updateFunc)
.checkpoint(stateStreamCheckpointInterval)
.map(t => (t._1, t._2.self))
.map(t => (t._1, t._2))
}
var ssc = setupStreams(input, operation)
var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
......@@ -162,13 +165,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
.updateStateByKey[RichInt](updateFunc)
.updateStateByKey(updateFunc)
.checkpoint(batchDuration * 2)
.map(t => (t._1, t._2.self))
.map(t => (t._1, t._2))
}
testCheckpointedOperation(input, operation, output, 7)
}
......@@ -350,4 +353,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
outputStream.output
}
}
\ No newline at end of file
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment