diff --git a/.gitignore b/.gitignore index c207409e3cfe087723ec38fa75fca68404bf251c..88d7b56181be7607e315b5f01990ed8951336dc9 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,7 @@ third_party/libmesos.so third_party/libmesos.dylib conf/java-opts conf/spark-env.sh +conf/streaming-env.sh conf/log4j.properties docs/_site docs/api @@ -31,4 +32,5 @@ project/plugins/src_managed/ logs/ log/ spark-tests.log +streaming-tests.log dependency-reduced-pom.xml diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 292ad3b9f90d33d69106b9f0a554fe39b8f34d18..beba9cfd4ffb0f9908a83f4a2045509f9a76c08b 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -189,7 +189,7 @@ abstract class DStream[T: ClassManifest] ( val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds logInfo("metadataCleanupDelay = " + metadataCleanerDelay) assert( - metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000, + metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, "It seems you are doing some DStream window operation or setting a checkpoint interval " + "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + "than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" + diff --git a/streaming/src/main/scala/spark/streaming/JobManager.scala b/streaming/src/main/scala/spark/streaming/JobManager.scala index fda7264a2742c0762149ea20e84c299f68f51d82..3b910538e028925bd9125db6ab4e2045717a2c9e 100644 --- a/streaming/src/main/scala/spark/streaming/JobManager.scala +++ b/streaming/src/main/scala/spark/streaming/JobManager.scala @@ -14,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging { try { val timeTaken = job.run() logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format( - (System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0)) + (System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0)) } catch { case e: Exception => logError("Running " + job + " failed", e) diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index aeb7c3eb0e031ceb67f80bf00abdf535654da46e..eb40affe6d5798f19537ba40f3045bf898bb56b9 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -22,7 +22,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_)) + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_)) def start() { // If context was started from checkpoint, then restart timer such that diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index ef73049a81c90c6e3258a44225196513862ab7a9..7256e41af9642ca5a89a9f68bb3683a68955d335 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -15,7 +15,6 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.hadoop.fs.Path import java.util.UUID @@ -101,14 +100,27 @@ class StreamingContext private ( protected[streaming] var receiverJobThread: Thread = null protected[streaming] var scheduler: Scheduler = null + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ def remember(duration: Time) { graph.remember(duration) } - def checkpoint(dir: String, interval: Time = null) { - if (dir != null) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir)) - checkpointDir = dir + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * @param interval checkpoint interval + */ + def checkpoint(directory: String, interval: Time = null) { + if (directory != null) { + sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) + checkpointDir = directory checkpointInterval = interval } else { checkpointDir = null @@ -122,9 +134,8 @@ class StreamingContext private ( protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() - /** + /** * Create an input stream that pulls messages form a Kafka Broker. - * * @param hostname Zookeper hostname. * @param port Zookeper port. * @param groupId The group id for this consumer. @@ -147,6 +158,15 @@ class StreamingContext private ( inputStream } + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ def networkTextStream( hostname: String, port: Int, @@ -155,6 +175,16 @@ class StreamingContext private ( networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ def networkStream[T: ClassManifest]( hostname: String, port: Int, @@ -166,16 +196,32 @@ class StreamingContext private ( inputStream } + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ def flumeStream ( - hostname: String, - port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = { + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): DStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) graph.addInputStream(inputStream) inputStream } - + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, @@ -188,7 +234,11 @@ class StreamingContext private ( /** * Creates a input stream that monitors a Hadoop-compatible filesystem - * for new files and executes the necessary processing on them. + * for new files and reads them using the given key-value types and input format. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file */ def fileStream[ K: ClassManifest, @@ -200,13 +250,23 @@ class StreamingContext private ( inputStream } + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). + * @param directory HDFS directory to monitor for new file + */ def textFileStream(directory: String): DStream[String] = { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } /** * Creates a input stream from an queue of RDDs. In each batch, - * it will process either one or all of the RDDs returned by the queue + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty + * @tparam T Type of objects in the RDD */ def queueStream[T: ClassManifest]( queue: Queue[RDD[T]], @@ -218,13 +278,9 @@ class StreamingContext private ( inputStream } - def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = { - val queue = new Queue[RDD[T]] - val inputStream = queueStream(queue, true, null) - queue ++= array - inputStream - } - + /** + * Create a unified DStream from multiple DStreams of the same type and same interval + */ def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = { new UnionDStream[T](streams.toArray) } @@ -256,7 +312,7 @@ class StreamingContext private ( } /** - * This function starts the execution of the streams. + * Starts the execution of the streams. */ def start() { if (checkpointDir != null && checkpointInterval == null && graph != null) { @@ -284,7 +340,7 @@ class StreamingContext private ( } /** - * This function stops the execution of the streams. + * Sstops the execution of the streams. */ def stop() { try { @@ -302,6 +358,10 @@ class StreamingContext private ( object StreamingContext { + implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { + new PairDStreamFunctions[K, V](stream) + } + protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = { // Set the default cleaner delay to an hour if not already set. @@ -312,10 +372,6 @@ object StreamingContext { new SparkContext(master, frameworkName) } - implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = { - new PairDStreamFunctions[K, V](stream) - } - protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = { if (prefix == null) { time.milliseconds.toString diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 2976e5e87be2eeed5324ec6190a4e0fe340c9e72..3c6fd5d9671612e52f95174827d092ffa7ee5ce4 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -1,16 +1,18 @@ package spark.streaming /** - * This class is simple wrapper class that represents time in UTC. - * @param millis Time in UTC long + * This is a simple class that represents time. Internally, it represents time as UTC. + * The recommended way to create instances of Time is to use helper objects + * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]]. + * @param millis Time in UTC. */ case class Time(private val millis: Long) { def < (that: Time): Boolean = (this.millis < that.millis) - + def <= (that: Time): Boolean = (this.millis <= that.millis) - + def > (that: Time): Boolean = (this.millis > that.millis) def >= (that: Time): Boolean = (this.millis >= that.millis) @@ -45,23 +47,33 @@ case class Time(private val millis: Long) { def milliseconds: Long = millis } -object Time { +private[streaming] object Time { val zero = Time(0) implicit def toTime(long: Long) = Time(long) - - implicit def toLong(time: Time) = time.milliseconds } +/** + * Helper object that creates instance of [[spark.streaming.Time]] representing + * a given number of milliseconds. + */ object Milliseconds { def apply(milliseconds: Long) = Time(milliseconds) } +/** + * Helper object that creates instance of [[spark.streaming.Time]] representing + * a given number of seconds. + */ object Seconds { def apply(seconds: Long) = Time(seconds * 1000) -} +} -object Minutes { +/** + * Helper object that creates instance of [[spark.streaming.Time]] representing + * a given number of minutes. + */ +object Minutes { def apply(minutes: Long) = Time(minutes * 60000) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala index 2e427dadf75f343b2681a50e5205c9aa8ad4f45e..bc23d423d3559956abd9c29459edaad5a445c031 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -4,6 +4,7 @@ import spark.{RDD, Partitioner} import spark.rdd.CoGroupedRDD import spark.streaming.{Time, DStream} +private[streaming] class CoGroupedDStream[K : ClassManifest]( parents: Seq[DStream[(_, _)]], partitioner: Partitioner diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index 8cdaff467b00e64783c05db7e4caacb07a5cca7e..cf720953248400485e81bd8658804e2bb7d8e3d8 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -10,7 +10,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import scala.collection.mutable.HashSet - +private[streaming] class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @transient ssc_ : StreamingContext, directory: String, diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index 7e988cadf44a185befe3b621175bdf79fbb375d7..ff73225e0f3e4d9c32dc7950ea938b5883f86a37 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -17,6 +17,7 @@ import java.net.InetSocketAddress import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer +private[streaming] class FlumeInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -93,6 +94,7 @@ private[streaming] object SparkFlumeEvent { } /** A simple server that implements Flume's Avro protocol. */ +private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) @@ -108,12 +110,13 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { /** A NetworkReceiver which listens for events using the * Flume Avro interface.*/ +private[streaming] class FlumeReceiver( - streamId: Int, - host: String, - port: Int, - storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + streamId: Int, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent](streamId) { lazy val dataHandler = new DataHandler(this, storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index a46721af2fdcd719de598a6d867ef540527340d7..175c75bcb94f3a9337322b30639d5dc5a0db0030 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -21,10 +21,12 @@ import scala.collection.JavaConversions._ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) // NOT USED - Originally intended for fault-tolerance // Metadata for a Kafka Stream that it sent to the Master +private[streaming] case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) // NOT USED - Originally intended for fault-tolerance // Checkpoint data specific to a KafkaInputDstream -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], +private[streaming] +case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** @@ -39,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. */ +private[streaming] class KafkaInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -98,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest]( } } +private[streaming] class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index 996cc7dea8d888f47e5ed96cf1b86c36aae9fd3f..aa2f31cea87dc180c7559483eb905afb121a850f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -17,6 +17,7 @@ import java.util.concurrent.ArrayBlockingQueue * data into Spark Streaming, though it requires the sender to batch data and serialize it * in the format that the system is configured with. */ +private[streaming] class RawInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -29,6 +30,7 @@ class RawInputDStream[T: ClassManifest]( } } +private[streaming] class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala index 2686de14d2c108d3d3ebea5ba5f3c057fdc16b73..d289ed2a3f49b930c1efee2d9a38bf18742c15e1 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -11,6 +11,7 @@ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer import spark.streaming.{Interval, Time, DStream} +private[streaming] class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index af5b73ae8dfb9689526b9d9c1d77cabd4e6fffc6..cbe437229973001646a1489468bd72f0a148e215 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -6,6 +6,7 @@ import spark.storage.StorageLevel import java.io._ import java.net.Socket +private[streaming] class SocketInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, @@ -19,7 +20,7 @@ class SocketInputDStream[T: ClassManifest]( } } - +private[streaming] class SocketReceiver[T: ClassManifest]( streamId: Int, host: String, @@ -50,7 +51,7 @@ class SocketReceiver[T: ClassManifest]( } - +private[streaming] object SocketReceiver { /** diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index 6e190b55642b79391068b81238a8dd6faa86843d..175b3060c146a9bbab2ddd17ddfe83f982044a0b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -6,6 +6,7 @@ import spark.SparkContext._ import spark.storage.StorageLevel import spark.streaming.{Time, DStream} +private[streaming] class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala index f1efb2ae7238e9499ce01407d350e7babbffadce..3bf4c2ecea82e77cd0a170f29bc45f91a38209d6 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -5,6 +5,7 @@ import spark.RDD import collection.mutable.ArrayBuffer import spark.rdd.UnionRDD +private[streaming] class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) extends DStream[T](parents.head.ssc) { diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala index 4b2621c4971e67cf341dcf74bf8884dd74f74472..7718794cbfa05ea97e00662ae48150a1e0c035df 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -5,7 +5,7 @@ import spark.rdd.UnionRDD import spark.storage.StorageLevel import spark.streaming.{Interval, Time, DStream} - +private[streaming] class WindowedDStream[T: ClassManifest]( parent: DStream[T], _windowTime: Time, diff --git a/streaming/src/main/scala/spark/streaming/util/Clock.scala b/streaming/src/main/scala/spark/streaming/util/Clock.scala index ed087e4ea88ef72d21ff6222927f4c1ccd002fab..974651f9f6c3363d2b971228a09a7da6decfde1f 100644 --- a/streaming/src/main/scala/spark/streaming/util/Clock.scala +++ b/streaming/src/main/scala/spark/streaming/util/Clock.scala @@ -1,13 +1,12 @@ package spark.streaming.util -import spark.streaming._ - -trait Clock { +private[streaming] +trait Clock { def currentTime(): Long def waitTillTime(targetTime: Long): Long } - +private[streaming] class SystemClock() extends Clock { val minPollTime = 25L @@ -54,6 +53,7 @@ class SystemClock() extends Clock { } } +private[streaming] class ManualClock() extends Clock { var time = 0L diff --git a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala index dc55fd902b8e323aa415325232535cb6c9d56a36..2e7f4169c93412b3667c102c0dcd268c8f5e9110 100644 --- a/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/spark/streaming/util/RecurringTimer.scala @@ -1,5 +1,6 @@ package spark.streaming.util +private[streaming] class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { val minPollTime = 25L