diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala new file mode 100644 index 0000000000000000000000000000000000000000..377bc0c98ec1f1185a97e8e559e7151e5aef7e85 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala @@ -0,0 +1,60 @@ +package spark.streaming.examples.twitter + +import spark.streaming.StreamingContext._ +import spark.streaming.{Seconds, StreamingContext} +import spark.SparkContext._ +import spark.storage.StorageLevel + +/** + * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter + * stream. The stream is instantiated with credentials and optionally filters supplied by the + * command line arguments. + */ +object TwitterBasic { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, filters, + StorageLevel.MEMORY_ONLY_SER) + ssc.registerInputStream(stream) + + val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) + + val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + .map{case (topic, count) => (count, topic)} + .transform(_.sortByKey(false)) + + val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) + .map{case (topic, count) => (count, topic)} + .transform(_.sortByKey(false)) + + + // Print popular hashtags + topCounts60.foreach(rdd => { + if (rdd.count() != 0) { + val topList = rdd.take(5) + println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + } + }) + + topCounts10.foreach(rdd => { + if (rdd.count() != 0) { + val topList = rdd.take(5) + println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + } + }) + + ssc.start() + } + +} diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala new file mode 100644 index 0000000000000000000000000000000000000000..c7e4855f3bdb92c306c2d4c340cc33cdc8a230cf --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala @@ -0,0 +1,70 @@ +package spark.streaming.examples.twitter + +import spark._ +import spark.streaming._ +import dstream.{NetworkReceiver, NetworkInputDStream} +import storage.StorageLevel +import twitter4j._ +import twitter4j.auth.BasicAuthorization +import collection.JavaConversions._ + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkInputDStream[Status](ssc_) { + + override def createReceiver(): NetworkReceiver[Status] = { + new TwitterReceiver(id, username, password, filters, storageLevel) + } +} + +class TwitterReceiver(streamId: Int, + username: String, + password: String, + filters: Seq[String], + storageLevel: StorageLevel + ) extends NetworkReceiver[Status](streamId) { + var twitterStream: TwitterStream = _ + lazy val blockGenerator = new BlockGenerator(storageLevel) + + protected override def onStart() { + blockGenerator.start() + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + blockGenerator += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) {} + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + logInfo("Twitter receiver started") + } + + protected override def onStop() { + blockGenerator.stop() + twitterStream.shutdown() + logInfo("Twitter receiver stopped") + } +} diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6ba3026bcc39cde223c3905096bd4f007ab918f6..d5cda347a424f1aa187b383ed7dfe94dcb625d44 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -116,7 +116,8 @@ object SparkBuild extends Build { "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", - "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", + "Twitter4J Repository" at "http://twitter4j.org/maven2/" ), libraryDependencies ++= Seq( @@ -134,6 +135,8 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-slf4j" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", + "org.twitter4j" % "twitter4j-core" % "3.0.2", + "org.twitter4j" % "twitter4j-stream" % "3.0.2", "cc.spray" % "spray-can" % "1.0-M2.1", "cc.spray" % "spray-server" % "1.0-M2.1", "org.apache.mesos" % "mesos" % "0.9.0-incubating" diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala index 11a7232d7b95fe36ce9f71808a23114871a5a91f..2f3adb39c2506590c9d65b80285eb6054deba111 100644 --- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala @@ -17,7 +17,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir - val checkpointInterval = ssc.checkpointInterval + val checkpointDuration: Duration = ssc.checkpointDuration def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index beba9cfd4ffb0f9908a83f4a2045509f9a76c08b..c89fb7723ef4cba2ad26827003ef6ae7c5ffd720 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -2,7 +2,7 @@ package spark.streaming import spark.streaming.dstream._ import StreamingContext._ -import Time._ +//import Time._ import spark.{RDD, Logging} import spark.storage.StorageLevel @@ -47,7 +47,7 @@ abstract class DStream[T: ClassManifest] ( // ======================================================================= /** Time interval after which the DStream generates a RDD */ - def slideTime: Time + def slideDuration: Duration /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] @@ -67,14 +67,14 @@ abstract class DStream[T: ClassManifest] ( protected[streaming] var zeroTime: Time = null // Duration for which the DStream will remember each RDD created - protected[streaming] var rememberDuration: Time = null + protected[streaming] var rememberDuration: Duration = null // Storage level of the RDDs in the stream protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE // Checkpoint details protected[streaming] val mustCheckpoint = false - protected[streaming] var checkpointInterval: Time = null + protected[streaming] var checkpointDuration: Duration = null protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]()) // Reference to whole DStream graph @@ -108,13 +108,13 @@ abstract class DStream[T: ClassManifest] ( * Enable periodic checkpointing of RDDs of this DStream * @param interval Time interval after which generated RDD will be checkpointed */ - def checkpoint(interval: Time): DStream[T] = { + def checkpoint(interval: Duration): DStream[T] = { if (isInitialized) { throw new UnsupportedOperationException( "Cannot change checkpoint interval of an DStream after streaming context has started") } persist() - checkpointInterval = interval + checkpointDuration = interval this } @@ -130,16 +130,16 @@ abstract class DStream[T: ClassManifest] ( } zeroTime = time - // Set the checkpoint interval to be slideTime or 10 seconds, which ever is larger - if (mustCheckpoint && checkpointInterval == null) { - checkpointInterval = slideTime.max(Seconds(10)) - logInfo("Checkpoint interval automatically set to " + checkpointInterval) + // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger + if (mustCheckpoint && checkpointDuration == null) { + checkpointDuration = slideDuration.max(Seconds(10)) + logInfo("Checkpoint interval automatically set to " + checkpointDuration) } // Set the minimum value of the rememberDuration if not already set - var minRememberDuration = slideTime - if (checkpointInterval != null && minRememberDuration <= checkpointInterval) { - minRememberDuration = checkpointInterval * 2 // times 2 just to be sure that the latest checkpoint is not forgetten + var minRememberDuration = slideDuration + if (checkpointDuration != null && minRememberDuration <= checkpointDuration) { + minRememberDuration = checkpointDuration * 2 // times 2 just to be sure that the latest checkpoint is not forgetten } if (rememberDuration == null || rememberDuration < minRememberDuration) { rememberDuration = minRememberDuration @@ -153,37 +153,37 @@ abstract class DStream[T: ClassManifest] ( assert(rememberDuration != null, "Remember duration is set to null") assert( - !mustCheckpoint || checkpointInterval != null, + !mustCheckpoint || checkpointDuration != null, "The checkpoint interval for " + this.getClass.getSimpleName + " has not been set. " + " Please use DStream.checkpoint() to set the interval." ) assert( - checkpointInterval == null || checkpointInterval >= slideTime, + checkpointDuration == null || checkpointDuration >= slideDuration, "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointInterval + " which is lower than its slide time (" + slideTime + "). " + - "Please set it to at least " + slideTime + "." + checkpointDuration + " which is lower than its slide time (" + slideDuration + "). " + + "Please set it to at least " + slideDuration + "." ) assert( - checkpointInterval == null || checkpointInterval.isMultipleOf(slideTime), + checkpointDuration == null || checkpointDuration.isMultipleOf(slideDuration), "The checkpoint interval for " + this.getClass.getSimpleName + " has been set to " + - checkpointInterval + " which not a multiple of its slide time (" + slideTime + "). " + - "Please set it to a multiple " + slideTime + "." + checkpointDuration + " which not a multiple of its slide time (" + slideDuration + "). " + + "Please set it to a multiple " + slideDuration + "." ) assert( - checkpointInterval == null || storageLevel != StorageLevel.NONE, + checkpointDuration == null || storageLevel != StorageLevel.NONE, "" + this.getClass.getSimpleName + " has been marked for checkpointing but the storage " + "level has not been set to enable persisting. Please use DStream.persist() to set the " + "storage level to use memory for better checkpointing performance." ) assert( - checkpointInterval == null || rememberDuration > checkpointInterval, + checkpointDuration == null || rememberDuration > checkpointDuration, "The remember duration for " + this.getClass.getSimpleName + " has been set to " + rememberDuration + " which is not more than the checkpoint interval (" + - checkpointInterval + "). Please set it to higher than " + checkpointInterval + "." + checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." ) val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds @@ -200,9 +200,9 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.validate()) - logInfo("Slide time = " + slideTime) + logInfo("Slide time = " + slideDuration) logInfo("Storage level = " + storageLevel) - logInfo("Checkpoint interval = " + checkpointInterval) + logInfo("Checkpoint interval = " + checkpointDuration) logInfo("Remember duration = " + rememberDuration) logInfo("Initialized and validated " + this) } @@ -224,7 +224,7 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.setGraph(graph)) } - protected[streaming] def remember(duration: Time) { + protected[streaming] def remember(duration: Duration) { if (duration != null && duration > rememberDuration) { rememberDuration = duration logInfo("Duration for remembering RDDs set to " + rememberDuration + " for " + this) @@ -232,11 +232,11 @@ abstract class DStream[T: ClassManifest] ( dependencies.foreach(_.remember(parentRememberDuration)) } - /** This method checks whether the 'time' is valid wrt slideTime for generating RDD */ + /** This method checks whether the 'time' is valid wrt slideDuration for generating RDD */ protected def isTimeValid(time: Time): Boolean = { if (!isInitialized) { throw new Exception (this + " has not been initialized") - } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideTime)) { + } else if (time <= zeroTime || ! (time - zeroTime).isMultipleOf(slideDuration)) { false } else { true @@ -266,7 +266,7 @@ abstract class DStream[T: ClassManifest] ( newRDD.persist(storageLevel) logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) } - if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { + if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) } @@ -528,21 +528,21 @@ abstract class DStream[T: ClassManifest] ( /** * Return a new DStream which is computed based on windowed batches of this DStream. * The new DStream generates RDDs with the same interval as this DStream. - * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. * @return */ - def window(windowTime: Time): DStream[T] = window(windowTime, this.slideTime) + def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) /** * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowTime duration (i.e., width) of the window; + * @param windowDuration duration (i.e., width) of the window; * must be a multiple of this DStream's interval - * @param slideTime sliding interval of the window (i.e., the interval after which + * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's interval */ - def window(windowTime: Time, slideTime: Time): DStream[T] = { - new WindowedDStream(this, windowTime, slideTime) + def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { + new WindowedDStream(this, windowDuration, slideDuration) } /** @@ -550,40 +550,40 @@ abstract class DStream[T: ClassManifest] ( * This is equivalent to window(batchTime, batchTime). * @param batchTime tumbling window duration; must be a multiple of this DStream's interval */ - def tumble(batchTime: Time): DStream[T] = window(batchTime, batchTime) + def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime) /** * Returns a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowTime and slideTime are as defined in the - * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc) + * elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) */ - def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time): DStream[T] = { - this.window(windowTime, slideTime).reduce(reduceFunc) + def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] = { + this.window(windowDuration, slideDuration).reduce(reduceFunc) } def reduceByWindow( reduceFunc: (T, T) => T, invReduceFunc: (T, T) => T, - windowTime: Time, - slideTime: Time + windowDuration: Duration, + slideDuration: Duration ): DStream[T] = { this.map(x => (1, x)) - .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1) + .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) .map(_._2) } /** * Returns a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowTime and slideTime are as defined in the - * window() operation. This is equivalent to window(windowTime, slideTime).count() + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = { - this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime) + def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Int] = { + this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) } /** * Returns a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + * @param that Another DStream having the same slideDuration as this DStream. */ def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) @@ -599,13 +599,13 @@ abstract class DStream[T: ClassManifest] ( */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() - var time = toTime.floor(slideTime) + var time = toTime.floor(slideDuration) while (time >= zeroTime && time >= fromTime) { getOrCompute(time) match { case Some(rdd) => rdds += rdd case None => //throw new Exception("Could not get RDD for time " + time) } - time -= slideTime + time -= slideDuration } rdds.toSeq } diff --git a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala index c72429370e8580828a8f9ad1f1e92a1e80eb28eb..bc4a40d7bccd4cd1b277e48119bb9a677a9d999d 100644 --- a/streaming/src/main/scala/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/spark/streaming/DStreamGraph.scala @@ -12,8 +12,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val outputStreams = new ArrayBuffer[DStream[_]]() private[streaming] var zeroTime: Time = null - private[streaming] var batchDuration: Time = null - private[streaming] var rememberDuration: Time = null + private[streaming] var batchDuration: Duration = null + private[streaming] var rememberDuration: Duration = null private[streaming] var checkpointInProgress = false private[streaming] def start(time: Time) { @@ -41,7 +41,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } } - private[streaming] def setBatchDuration(duration: Time) { + private[streaming] def setBatchDuration(duration: Duration) { this.synchronized { if (batchDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + @@ -51,7 +51,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { batchDuration = duration } - private[streaming] def remember(duration: Time) { + private[streaming] def remember(duration: Duration) { this.synchronized { if (rememberDuration != null) { throw new Exception("Batch duration already set as " + batchDuration + diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala new file mode 100644 index 0000000000000000000000000000000000000000..e4dc579a170a4214ffa3a9ee959b160471a26dbe --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/Duration.scala @@ -0,0 +1,62 @@ +package spark.streaming + +case class Duration (private val millis: Long) { + + def < (that: Duration): Boolean = (this.millis < that.millis) + + def <= (that: Duration): Boolean = (this.millis <= that.millis) + + def > (that: Duration): Boolean = (this.millis > that.millis) + + def >= (that: Duration): Boolean = (this.millis >= that.millis) + + def + (that: Duration): Duration = new Duration(millis + that.millis) + + def - (that: Duration): Duration = new Duration(millis - that.millis) + + def * (times: Int): Duration = new Duration(millis * times) + + def / (that: Duration): Long = millis / that.millis + + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.millis == 0) + + def min(that: Duration): Duration = if (this < that) this else that + + def max(that: Duration): Duration = if (this > that) this else that + + def isZero: Boolean = (this.millis == 0) + + override def toString: String = (millis.toString + " ms") + + def toFormattedString: String = millis.toString + + def milliseconds: Long = millis +} + + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of milliseconds. + */ +object Milliseconds { + def apply(milliseconds: Long) = new Duration(milliseconds) +} + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of seconds. + */ +object Seconds { + def apply(seconds: Long) = new Duration(seconds * 1000) +} + +/** + * Helper object that creates instance of [[spark.streaming.Duration]] representing + * a given number of minutes. + */ +object Minutes { + def apply(minutes: Long) = new Duration(minutes * 60000) +} + + diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index fa0b7ce19d64a3ab103bdfc76db8d86c30a992df..dc21dfb72228801bb1db803901ec81e6b76ae411 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -1,16 +1,16 @@ package spark.streaming private[streaming] -case class Interval(beginTime: Time, endTime: Time) { - def this(beginMs: Long, endMs: Long) = this(Time(beginMs), new Time(endMs)) +class Interval(val beginTime: Time, val endTime: Time) { + def this(beginMs: Long, endMs: Long) = this(new Time(beginMs), new Time(endMs)) - def duration(): Time = endTime - beginTime + def duration(): Duration = endTime - beginTime - def + (time: Time): Interval = { + def + (time: Duration): Interval = { new Interval(beginTime + time, endTime + time) } - def - (time: Time): Interval = { + def - (time: Duration): Interval = { new Interval(beginTime - time, endTime - time) } @@ -27,24 +27,14 @@ case class Interval(beginTime: Time, endTime: Time) { def >= (that: Interval) = !(this < that) - def next(): Interval = { - this + (endTime - beginTime) - } - - def isZero = (beginTime.isZero && endTime.isZero) - - def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString - - override def toString = "[" + beginTime + ", " + endTime + "]" + override def toString = "[" + beginTime + ", " + endTime + "]" } object Interval { - def zero() = new Interval (Time.zero, Time.zero) - - def currentInterval(intervalDuration: Time): Interval = { - val time = Time(System.currentTimeMillis) - val intervalBegin = time.floor(intervalDuration) - Interval(intervalBegin, intervalBegin + intervalDuration) + def currentInterval(duration: Duration): Interval = { + val time = new Time(System.currentTimeMillis) + val intervalBegin = time.floor(duration) + new Interval(intervalBegin, intervalBegin + duration) } } diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index b0a208e67f32856521e4d9b6b356f65af8ed23be..482d01300ddd7aa9d3d8190c4725a0656e6b4475 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -21,14 +21,10 @@ extends Serializable { def ssc = self.ssc - def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions) } - /* ---------------------------------- */ - /* DStream operations for key-value pairs */ - /* ---------------------------------- */ - def groupByKey(): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner()) } @@ -69,59 +65,59 @@ extends Serializable { self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner()) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } def groupByKeyAndWindow( - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, numPartitions: Int ): DStream[(K, Seq[V])] = { - groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner(numPartitions)) + groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } def groupByKeyAndWindow( - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, partitioner: Partitioner ): DStream[(K, Seq[V])] = { - self.window(windowTime, slideTime).groupByKey(partitioner) + self.window(windowDuration, slideDuration).groupByKey(partitioner) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time + windowDuration: Duration ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowTime, self.slideTime, defaultPartitioner()) + reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time + windowDuration: Duration, + slideDuration: Duration ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner()) + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions)) + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, partitioner: Partitioner ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) self.reduceByKey(cleanedReduceFunc, partitioner) - .window(windowTime, slideTime) + .window(windowDuration, slideDuration) .reduceByKey(cleanedReduceFunc, partitioner) } @@ -134,51 +130,51 @@ extends Serializable { def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time + windowDuration: Duration, + slideDuration: Duration ): DStream[(K, V)] = { reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner()) + reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner()) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = { reduceByKeyAndWindow( - reduceFunc, invReduceFunc, windowTime, slideTime, defaultPartitioner(numPartitions)) + reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, partitioner: Partitioner ): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc) new ReducedWindowedDStream[K, V]( - self, cleanedReduceFunc, cleanedInvReduceFunc, windowTime, slideTime, partitioner) + self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner) } def countByKeyAndWindow( - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, numPartitions: Int = self.ssc.sc.defaultParallelism ): DStream[(K, Long)] = { self.map(x => (x._1, 1L)).reduceByKeyAndWindow( (x: Long, y: Long) => x + y, (x: Long, y: Long) => x - y, - windowTime, - slideTime, + windowDuration, + slideDuration, numPartitions ) } diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index eb40affe6d5798f19537ba40f3045bf898bb56b9..c04ed37de8b21708bdc7ef295441129386f4965b 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -14,7 +14,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) - val checkpointWriter = if (ssc.checkpointInterval != null && ssc.checkpointDir != null) { + val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { new CheckpointWriter(ssc.checkpointDir) } else { null @@ -22,7 +22,8 @@ class Scheduler(ssc: StreamingContext) extends Logging { val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock") val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] - val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_)) + val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, + longTime => generateRDDs(new Time(longTime))) def start() { // If context was started from checkpoint, then restart timer such that @@ -41,7 +42,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { timer.restart(graph.zeroTime.milliseconds) logInfo("Scheduler's timer restarted") } else { - val firstTime = Time(timer.start()) + val firstTime = new Time(timer.start()) graph.start(firstTime - ssc.graph.batchDuration) logInfo("Scheduler's timer started") } @@ -64,7 +65,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { } private def doCheckpoint(time: Time) { - if (ssc.checkpointInterval != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointInterval)) { + if (ssc.checkpointDuration != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { val startTime = System.currentTimeMillis() ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time)) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 215246ba2eff02e6902d2aaad092a39f8d4db2b8..14500bdcb17a07ccf36e7cdf7f91f48606583a6e 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -26,7 +26,7 @@ import java.util.UUID class StreamingContext private ( sc_ : SparkContext, cp_ : Checkpoint, - batchDur_ : Time + batchDur_ : Duration ) extends Logging { /** @@ -34,7 +34,7 @@ class StreamingContext private ( * @param sparkContext Existing SparkContext * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(sparkContext: SparkContext, batchDuration: Time) = this(sparkContext, null, batchDuration) + def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration) /** * Creates a StreamingContext by providing the details necessary for creating a new SparkContext. @@ -42,7 +42,7 @@ class StreamingContext private ( * @param frameworkName A name for your job, to display on the cluster web UI * @param batchDuration The time interval at which streaming data will be divided into batches */ - def this(master: String, frameworkName: String, batchDuration: Time) = + def this(master: String, frameworkName: String, batchDuration: Duration) = this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration) /** @@ -96,7 +96,7 @@ class StreamingContext private ( } } - protected[streaming] var checkpointInterval: Time = if (isCheckpointPresent) cp_.checkpointInterval else null + protected[streaming] var checkpointDuration: Duration = if (isCheckpointPresent) cp_.checkpointDuration else null protected[streaming] var receiverJobThread: Thread = null protected[streaming] var scheduler: Scheduler = null @@ -107,7 +107,7 @@ class StreamingContext private ( * if the developer wishes to query old data outside the DStream computation). * @param duration Minimum duration that each DStream should remember its RDDs */ - def remember(duration: Time) { + def remember(duration: Duration) { graph.remember(duration) } @@ -117,14 +117,14 @@ class StreamingContext private ( * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Time = null) { + def checkpoint(directory: String, interval: Duration = null) { if (directory != null) { sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) checkpointDir = directory - checkpointInterval = interval + checkpointDuration = interval } else { checkpointDir = null - checkpointInterval = null + checkpointDuration = null } } @@ -327,7 +327,7 @@ class StreamingContext private ( graph.validate() assert( - checkpointDir == null || checkpointInterval != null, + checkpointDir == null || checkpointDuration != null, "Checkpoint directory has been set, but the graph checkpointing interval has " + "not been set. Please use StreamingContext.checkpoint() to set the interval." ) @@ -337,8 +337,8 @@ class StreamingContext private ( * Starts the execution of the streams. */ def start() { - if (checkpointDir != null && checkpointInterval == null && graph != null) { - checkpointInterval = graph.batchDuration + if (checkpointDir != null && checkpointDuration == null && graph != null) { + checkpointDuration = graph.batchDuration } validate() diff --git a/streaming/src/main/scala/spark/streaming/Time.scala b/streaming/src/main/scala/spark/streaming/Time.scala index 3c6fd5d9671612e52f95174827d092ffa7ee5ce4..5daeb761ddd670b6daa50ad5226ffb07496e5218 100644 --- a/streaming/src/main/scala/spark/streaming/Time.scala +++ b/streaming/src/main/scala/spark/streaming/Time.scala @@ -1,14 +1,15 @@ package spark.streaming /** - * This is a simple class that represents time. Internally, it represents time as UTC. - * The recommended way to create instances of Time is to use helper objects - * [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]]. - * @param millis Time in UTC. + * This is a simple class that represents an absolute instant of time. + * Internally, it represents time as the difference, measured in milliseconds, between the current + * time and midnight, January 1, 1970 UTC. This is the same format as what is returned by + * System.currentTimeMillis. */ - case class Time(private val millis: Long) { - + + def milliseconds: Long = millis + def < (that: Time): Boolean = (this.millis < that.millis) def <= (that: Time): Boolean = (this.millis <= that.millis) @@ -17,63 +18,25 @@ case class Time(private val millis: Long) { def >= (that: Time): Boolean = (this.millis >= that.millis) - def + (that: Time): Time = Time(millis + that.millis) - - def - (that: Time): Time = Time(millis - that.millis) - - def * (times: Int): Time = Time(millis * times) + def + (that: Duration): Time = new Time(millis + that.milliseconds) + + def - (that: Time): Duration = new Duration(millis - that.millis) - def / (that: Time): Long = millis / that.millis + def - (that: Duration): Time = new Time(millis - that.milliseconds) - def floor(that: Time): Time = { - val t = that.millis + def floor(that: Duration): Time = { + val t = that.milliseconds val m = math.floor(this.millis / t).toLong - Time(m * t) + new Time(m * t) } - def isMultipleOf(that: Time): Boolean = - (this.millis % that.millis == 0) + def isMultipleOf(that: Duration): Boolean = + (this.millis % that.milliseconds == 0) def min(that: Time): Time = if (this < that) this else that def max(that: Time): Time = if (this > that) this else that - def isZero: Boolean = (this.millis == 0) - override def toString: String = (millis.toString + " ms") - def toFormattedString: String = millis.toString - - def milliseconds: Long = millis -} - -private[streaming] object Time { - val zero = Time(0) - - implicit def toTime(long: Long) = Time(long) -} - -/** - * Helper object that creates instance of [[spark.streaming.Time]] representing - * a given number of milliseconds. - */ -object Milliseconds { - def apply(milliseconds: Long) = Time(milliseconds) -} - -/** - * Helper object that creates instance of [[spark.streaming.Time]] representing - * a given number of seconds. - */ -object Seconds { - def apply(seconds: Long) = Time(seconds * 1000) -} - -/** - * Helper object that creates instance of [[spark.streaming.Time]] representing - * a given number of minutes. - */ -object Minutes { - def apply(minutes: Long) = Time(minutes * 60000) -} - +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala index bc23d423d3559956abd9c29459edaad5a445c031..ddb1bf6b28fdb4466dacf1f6b827e8086dc9090c 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/CoGroupedDStream.scala @@ -2,7 +2,7 @@ package spark.streaming.dstream import spark.{RDD, Partitioner} import spark.rdd.CoGroupedRDD -import spark.streaming.{Time, DStream} +import spark.streaming.{Time, DStream, Duration} private[streaming] class CoGroupedDStream[K : ClassManifest]( @@ -18,13 +18,13 @@ class CoGroupedDStream[K : ClassManifest]( throw new IllegalArgumentException("Array of parents have different StreamingContexts") } - if (parents.map(_.slideTime).distinct.size > 1) { + if (parents.map(_.slideDuration).distinct.size > 1) { throw new IllegalArgumentException("Array of parents have different slide times") } override def dependencies = parents.toList - override def slideTime = parents.head.slideTime + override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { val part = partitioner diff --git a/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala b/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala deleted file mode 100644 index d737ba1ecc39a1b78f707642aee7d41914ef77d4..0000000000000000000000000000000000000000 --- a/streaming/src/main/scala/spark/streaming/dstream/DataHandler.scala +++ /dev/null @@ -1,83 +0,0 @@ -package spark.streaming.dstream - -import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer -import spark.Logging -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - - -/** - * This is a helper object that manages the data received from the socket. It divides - * the object received into small batches of 100s of milliseconds, pushes them as - * blocks into the block manager and reports the block IDs to the network input - * tracker. It starts two threads, one to periodically start a new batch and prepare - * the previous batch of as a block, the other to push the blocks into the block - * manager. - */ - class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) - extends Serializable with Logging { - - case class Block(id: String, iterator: Iterator[T], metadata: Any = null) - - val clock = new SystemClock() - val blockInterval = 200L - val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) - val blockStorageLevel = storageLevel - val blocksForPushing = new ArrayBlockingQueue[Block](1000) - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - var currentBuffer = new ArrayBuffer[T] - - def createBlock(blockId: String, iterator: Iterator[T]) : Block = { - new Block(blockId, iterator) - } - - def start() { - blockIntervalTimer.start() - blockPushingThread.start() - logInfo("Data handler started") - } - - def stop() { - blockIntervalTimer.stop() - blockPushingThread.interrupt() - logInfo("Data handler stopped") - } - - def += (obj: T) { - currentBuffer += obj - } - - def updateCurrentBuffer(time: Long) { - try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[T] - if (newBlockBuffer.size > 0) { - val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) - val newBlock = createBlock(blockId, newBlockBuffer.toIterator) - blocksForPushing.add(newBlock) - } - } catch { - case ie: InterruptedException => - logInfo("Block interval timer thread interrupted") - case e: Exception => - receiver.stop() - } - } - - def keepPushingBlocks() { - logInfo("Block pushing thread started") - try { - while(true) { - val block = blocksForPushing.take() - receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) - } - } catch { - case ie: InterruptedException => - logInfo("Block pushing thread interrupted") - case e: Exception => - receiver.stop() - } - } - } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala index 1cbb4d536e3c2a2bfc47c0e554e991960e0cdc2b..e993164f9949231fd0e583c0df5a7902afda7e34 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FilteredDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -11,7 +11,7 @@ class FilteredDStream[T: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { parent.getOrCompute(validTime).map(_.filter(filterFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala index 11ed8cf317c56a1294f787f5ccf585c7faff6a5b..cabd34f5f2e763a0a3c32c77ff85836d64cf100a 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMapValuedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD import spark.SparkContext._ @@ -12,7 +12,7 @@ class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest] override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala index a13b4c9ff98d59ddd82a85eeb2826a83020a4438..a69af6058903b850ffbb32319ece644fbd8d7de5 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlatMappedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -11,7 +11,7 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index ff73225e0f3e4d9c32dc7950ea938b5883f86a37..ca70e72e56cf08c9e74d96ee36b955f431adafdb 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -97,13 +97,13 @@ private[streaming] object SparkFlumeEvent { private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) Status.OK } override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { events.foreach (event => - receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) + receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) Status.OK } } @@ -118,19 +118,19 @@ class FlumeReceiver( storageLevel: StorageLevel ) extends NetworkReceiver[SparkFlumeEvent](streamId) { - lazy val dataHandler = new DataHandler(this, storageLevel) + lazy val blockGenerator = new BlockGenerator(storageLevel) protected override def onStart() { val responder = new SpecificResponder( classOf[AvroSourceProtocol], new FlumeEventServer(this)); val server = new NettyServer(responder, new InetSocketAddress(host, port)); - dataHandler.start() + blockGenerator.start() server.start() logInfo("Flume receiver started") } protected override def onStop() { - dataHandler.stop() + blockGenerator.stop() logInfo("Flume receiver stopped") } diff --git a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala index 41c629a225018528f44cf7a5a7425c3ed1336098..ee69ea5177921e8b172427b6e6f579f18f70a7a3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ForEachDStream.scala @@ -1,7 +1,7 @@ package spark.streaming.dstream import spark.RDD -import spark.streaming.{DStream, Job, Time} +import spark.streaming.{Duration, DStream, Job, Time} private[streaming] class ForEachDStream[T: ClassManifest] ( @@ -11,7 +11,7 @@ class ForEachDStream[T: ClassManifest] ( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None diff --git a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala index 92ea503caeff96db8f06e63ede3ca0477481488d..b589cbd4d50b79ca03959f1de71b9d69f00d5c56 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/GlommedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -9,7 +9,7 @@ class GlommedDStream[T: ClassManifest](parent: DStream[T]) override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Array[T]]] = { parent.getOrCompute(validTime).map(_.glom()) diff --git a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala index 4959c66b060554cc3e62f9a45f8f40faec55b71f..980ca5177eb1d20a5b8b8a85155e914ff94e1511 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/InputDStream.scala @@ -1,13 +1,13 @@ package spark.streaming.dstream -import spark.streaming.{StreamingContext, DStream} +import spark.streaming.{Duration, StreamingContext, DStream} abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { override def dependencies = List() - override def slideTime = { + override def slideDuration: Duration = { if (ssc == null) throw new Exception("ssc is null") if (ssc.graph.batchDuration == null) throw new Exception("batchDuration is null") ssc.graph.batchDuration diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 175c75bcb94f3a9337322b30639d5dc5a0db0030..25988a2ce7b943f1a53b91e1101dfed0531b108b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -110,20 +110,19 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, val ZK_TIMEOUT = 10000 // Handles pushing data into the BlockManager - lazy protected val dataHandler = new DataHandler(this, storageLevel) + lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset lazy val offsets = HashMap[KafkaPartitionKey, Long]() // Connection to Kafka var consumerConnector : ZookeeperConsumerConnector = null def onStop() { - dataHandler.stop() + blockGenerator.stop() } def onStart() { - // Starting the DataHandler that buffers blocks and pushes them into them BlockManager - dataHandler.start() + blockGenerator.start() // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) @@ -171,8 +170,8 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, private class MessageHandler(stream: KafkaStream[String]) extends Runnable { def run() { logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => - dataHandler += msgAndMetadata.message + stream.takeWhile { msgAndMetadata => + blockGenerator += msgAndMetadata.message // Updating the offet. The key is (broker, topic, group, partition). val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, @@ -189,7 +188,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, // NOT USED - Originally intended for fault-tolerance // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) - // extends DataHandler[Any](receiver, storageLevel) { + // extends BufferingBlockCreator[Any](receiver, storageLevel) { // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { // // Creates a new Block with Kafka-specific Metadata diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala index daf78c6893a12b67a5eba81978e7af9b2bf0b595..848afecfad599fa51daf60b5576c408f5b957d7f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/MapPartitionedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -12,7 +12,7 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala index 689caeef0ef9f965b4da9361c01e26b8993bf9c4..6055aa6a0597badea5922adcc2e0ec350e69cd72 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/MapValuedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD import spark.SparkContext._ @@ -12,7 +12,7 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K, U)]] = { parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala index 786b9966f24862bcf050ed82320e175cc952fc72..20818a0cab108f3c578c821559bbaa4301170e5a 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/MappedDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD private[streaming] @@ -11,7 +11,7 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] ( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 41276da8bb5533558011de347f6275f6af48ad46..18e62a0e33fe65aeed88ec0dc88418286190e6dc 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -14,6 +14,8 @@ import akka.actor.{Props, Actor} import akka.pattern.ask import akka.dispatch.Await import akka.util.duration._ +import spark.streaming.util.{RecurringTimer, SystemClock} +import java.util.concurrent.ArrayBlockingQueue abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { @@ -154,4 +156,77 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri tracker ! DeregisterReceiver(streamId, msg) } } + + /** + * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into + * appropriately named blocks at regular intervals. This class starts two threads, + * one to periodically start a new batch and prepare the previous batch of as a block, + * the other to push the blocks into the block manager. + */ + class BlockGenerator(storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = 200L + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + private def createBlock(blockId: String, iterator: Iterator[T]) : Block = { + new Block(blockId, iterator) + } + + private def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval) + val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + + private def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + NetworkReceiver.this.stop() + } + } + } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala index d289ed2a3f49b930c1efee2d9a38bf18742c15e1..733d5c4a25271cbd741083dd392590bcb176298b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -9,26 +9,26 @@ import spark.SparkContext._ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer -import spark.streaming.{Interval, Time, DStream} +import spark.streaming.{Duration, Interval, Time, DStream} private[streaming] class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( parent: DStream[(K, V)], reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, - _windowTime: Time, - _slideTime: Time, + _windowDuration: Duration, + _slideDuration: Duration, partitioner: Partitioner ) extends DStream[(K,V)](parent.ssc) { - assert(_windowTime.isMultipleOf(parent.slideTime), - "The window duration of ReducedWindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" + assert(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - assert(_slideTime.isMultipleOf(parent.slideTime), - "The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" + assert(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) // Reduce each batch of data using reduceByKey which will be further reduced by window @@ -39,15 +39,15 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( super.persist(StorageLevel.MEMORY_ONLY_SER) reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) - def windowTime: Time = _windowTime + def windowDuration: Duration = _windowDuration override def dependencies = List(reducedStream) - override def slideTime: Time = _slideTime + override def slideDuration: Duration = _slideDuration override val mustCheckpoint = true - override def parentRememberDuration: Time = rememberDuration + windowTime + override def parentRememberDuration: Duration = rememberDuration + windowDuration override def persist(storageLevel: StorageLevel): DStream[(K,V)] = { super.persist(storageLevel) @@ -55,7 +55,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( this } - override def checkpoint(interval: Time): DStream[(K, V)] = { + override def checkpoint(interval: Duration): DStream[(K, V)] = { super.checkpoint(interval) //reducedStream.checkpoint(interval) this @@ -66,11 +66,11 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( val invReduceF = invReduceFunc val currentTime = validTime - val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) - val previousWindow = currentWindow - slideTime + val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) + val previousWindow = currentWindow - slideDuration - logDebug("Window time = " + windowTime) - logDebug("Slide time = " + slideTime) + logDebug("Window time = " + windowDuration) + logDebug("Slide time = " + slideDuration) logDebug("ZeroTime = " + zeroTime) logDebug("Current window = " + currentWindow) logDebug("Previous window = " + previousWindow) @@ -87,11 +87,11 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( // // Get the RDDs of the reduced values in "old time steps" - val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime) + val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" - val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime) + val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime) logDebug("# new RDDs = " + newRDDs.size) // Get the RDD of the reduced value of the previous window diff --git a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala index 6854bbe66502ed39a979a21721068639ff8a5211..1f9548bfb85ec952f8d1b35d2549fc31179a2f35 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/ShuffledDStream.scala @@ -2,7 +2,7 @@ package spark.streaming.dstream import spark.{RDD, Partitioner} import spark.SparkContext._ -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} private[streaming] class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( @@ -15,7 +15,7 @@ class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest]( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[(K,C)]] = { parent.getOrCompute(validTime) match { diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index cbe437229973001646a1489468bd72f0a148e215..8e4b20ea4c4e7e8022d02530c93e43bb297a08d3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -29,7 +29,7 @@ class SocketReceiver[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkReceiver[T](streamId) { - lazy protected val dataHandler = new DataHandler(this, storageLevel) + lazy protected val blockGenerator = new BlockGenerator(storageLevel) override def getLocationPreference = None @@ -37,16 +37,16 @@ class SocketReceiver[T: ClassManifest]( logInfo("Connecting to " + host + ":" + port) val socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) - dataHandler.start() + blockGenerator.start() val iterator = bytesToObjects(socket.getInputStream()) while(iterator.hasNext) { val obj = iterator.next - dataHandler += obj + blockGenerator += obj } } protected def onStop() { - dataHandler.stop() + blockGenerator.stop() } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index 175b3060c146a9bbab2ddd17ddfe83f982044a0b..a1ec2f5454f1c542e2a5c856fc7414552a474623 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -4,7 +4,7 @@ import spark.RDD import spark.Partitioner import spark.SparkContext._ import spark.storage.StorageLevel -import spark.streaming.{Time, DStream} +import spark.streaming.{Duration, Time, DStream} private[streaming] class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest]( @@ -18,14 +18,14 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife override def dependencies = List(parent) - override def slideTime = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override val mustCheckpoint = true override def compute(validTime: Time): Option[RDD[(K, S)]] = { // Try to get the previous state RDD - getOrCompute(validTime - slideTime) match { + getOrCompute(validTime - slideDuration) match { case Some(prevStateRDD) => { // If previous state RDD exists diff --git a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala index 0337579514ad24f677674538df005c722dde0a53..99660d9dee0d02f14cfcd616c2b4c4f440c80c9f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/TransformedDStream.scala @@ -1,7 +1,7 @@ package spark.streaming.dstream import spark.RDD -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} private[streaming] class TransformedDStream[T: ClassManifest, U: ClassManifest] ( @@ -11,7 +11,7 @@ class TransformedDStream[T: ClassManifest, U: ClassManifest] ( override def dependencies = List(parent) - override def slideTime: Time = parent.slideTime + override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(transformFunc(_, validTime)) diff --git a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala index 3bf4c2ecea82e77cd0a170f29bc45f91a38209d6..00bad5da34c7a19da4b98e8a0d20bf7b1ca87b5f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/UnionDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.dstream -import spark.streaming.{DStream, Time} +import spark.streaming.{Duration, DStream, Time} import spark.RDD import collection.mutable.ArrayBuffer import spark.rdd.UnionRDD @@ -17,13 +17,13 @@ class UnionDStream[T: ClassManifest](parents: Array[DStream[T]]) throw new IllegalArgumentException("Array of parents have different StreamingContexts") } - if (parents.map(_.slideTime).distinct.size > 1) { + if (parents.map(_.slideDuration).distinct.size > 1) { throw new IllegalArgumentException("Array of parents have different slide times") } override def dependencies = parents.toList - override def slideTime: Time = parents.head.slideTime + override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() diff --git a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala index 7718794cbfa05ea97e00662ae48150a1e0c035df..cbf0c88108bfdb39811e597d1f24bb4bc183b789 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/WindowedDStream.scala @@ -3,35 +3,35 @@ package spark.streaming.dstream import spark.RDD import spark.rdd.UnionRDD import spark.storage.StorageLevel -import spark.streaming.{Interval, Time, DStream} +import spark.streaming.{Duration, Interval, Time, DStream} private[streaming] class WindowedDStream[T: ClassManifest]( parent: DStream[T], - _windowTime: Time, - _slideTime: Time) + _windowDuration: Duration, + _slideDuration: Duration) extends DStream[T](parent.ssc) { - if (!_windowTime.isMultipleOf(parent.slideTime)) - throw new Exception("The window duration of WindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + if (!_windowDuration.isMultipleOf(parent.slideDuration)) + throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") - if (!_slideTime.isMultipleOf(parent.slideTime)) - throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + - "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") + if (!_slideDuration.isMultipleOf(parent.slideDuration)) + throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") parent.persist(StorageLevel.MEMORY_ONLY_SER) - def windowTime: Time = _windowTime + def windowDuration: Duration = _windowDuration override def dependencies = List(parent) - override def slideTime: Time = _slideTime + override def slideDuration: Duration = _slideDuration - override def parentRememberDuration: Time = rememberDuration + windowTime + override def parentRememberDuration: Duration = rememberDuration + windowDuration override def compute(validTime: Time): Option[RDD[T]] = { - val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) } } diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index dc38ef4912f3781479a22222c30349ea203980d8..f9e03c607d7554f6b4de2740e74f0af66cae311d 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -196,18 +196,18 @@ class BasicOperationsSuite extends TestSuiteBase { // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 // WindowedStream2 - assert(windowedStream2.generatedRDDs.contains(Seconds(10))) - assert(windowedStream2.generatedRDDs.contains(Seconds(8))) - assert(!windowedStream2.generatedRDDs.contains(Seconds(6))) + assert(windowedStream2.generatedRDDs.contains(Time(10000))) + assert(windowedStream2.generatedRDDs.contains(Time(8000))) + assert(!windowedStream2.generatedRDDs.contains(Time(6000))) // WindowedStream1 - assert(windowedStream1.generatedRDDs.contains(Seconds(10))) - assert(windowedStream1.generatedRDDs.contains(Seconds(4))) - assert(!windowedStream1.generatedRDDs.contains(Seconds(3))) + assert(windowedStream1.generatedRDDs.contains(Time(10000))) + assert(windowedStream1.generatedRDDs.contains(Time(4000))) + assert(!windowedStream1.generatedRDDs.contains(Time(3000))) // MappedStream - assert(mappedStream.generatedRDDs.contains(Seconds(10))) - assert(mappedStream.generatedRDDs.contains(Seconds(2))) - assert(!mappedStream.generatedRDDs.contains(Seconds(1))) + assert(mappedStream.generatedRDDs.contains(Time(10000))) + assert(mappedStream.generatedRDDs.contains(Time(2000))) + assert(!mappedStream.generatedRDDs.contains(Time(1000))) } } diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index 28bdd53c3c95422a25e6fdf9fb2cd0ae8c6e0005..a76f61d4adb0601759634fc6ee03d4cf57710fff 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -26,7 +26,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ def compute(validTime: Time): Option[RDD[T]] = { logInfo("Computing RDD for time " + validTime) - val index = ((validTime - zeroTime) / slideTime - 1).toInt + val index = ((validTime - zeroTime) / slideDuration - 1).toInt val selectedInput = if (index < input.size) input(index) else Seq[T]() val rdd = ssc.sc.makeRDD(selectedInput, numPartitions) logInfo("Created RDD " + rdd.id + " with " + selectedInput) diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 4bc5229465267272e41adad5bd46434ad883c089..fa117cfcf05d76a040996f19fceac96534bcabb0 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -207,11 +207,11 @@ class WindowOperationsSuite extends TestSuiteBase { test("groupByKeyAndWindow") { val input = bigInput val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet))) - val windowTime = Seconds(2) - val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt + val windowDuration = Seconds(2) + val slideDuration = Seconds(1) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.groupByKeyAndWindow(windowTime, slideTime) + s.groupByKeyAndWindow(windowDuration, slideDuration) .map(x => (x._1, x._2.toSet)) .persist() } @@ -221,21 +221,21 @@ class WindowOperationsSuite extends TestSuiteBase { test("countByWindow") { val input = Seq(Seq(1), Seq(1), Seq(1, 2), Seq(0), Seq(), Seq() ) val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0)) - val windowTime = Seconds(2) - val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt - val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime) + val windowDuration = Seconds(2) + val slideDuration = Seconds(1) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[Int]) => s.countByWindow(windowDuration, slideDuration) testOperation(input, operation, expectedOutput, numBatches, true) } test("countByKeyAndWindow") { val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20))) val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) - val windowTime = Seconds(2) - val slideTime = Seconds(1) - val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt + val windowDuration = Seconds(2) + val slideDuration = Seconds(1) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt)) + s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt)) } testOperation(input, operation, expectedOutput, numBatches, true) } @@ -247,12 +247,12 @@ class WindowOperationsSuite extends TestSuiteBase { name: String, input: Seq[Seq[Int]], expectedOutput: Seq[Seq[Int]], - windowTime: Time = Seconds(2), - slideTime: Time = Seconds(1) + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) ) { test("window - " + name) { - val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt - val operation = (s: DStream[Int]) => s.window(windowTime, slideTime) + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt + val operation = (s: DStream[Int]) => s.window(windowDuration, slideDuration) testOperation(input, operation, expectedOutput, numBatches, true) } } @@ -261,13 +261,13 @@ class WindowOperationsSuite extends TestSuiteBase { name: String, input: Seq[Seq[(String, Int)]], expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = Seconds(2), - slideTime: Time = Seconds(1) + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) ) { test("reduceByKeyAndWindow - " + name) { - val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() + s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist() } testOperation(input, operation, expectedOutput, numBatches, true) } @@ -277,13 +277,13 @@ class WindowOperationsSuite extends TestSuiteBase { name: String, input: Seq[Seq[(String, Int)]], expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = Seconds(2), - slideTime: Time = Seconds(1) + windowDuration: Duration = Seconds(2), + slideDuration: Duration = Seconds(1) ) { test("reduceByKeyAndWindowInv - " + name) { - val numBatches = expectedOutput.size * (slideTime / batchDuration).toInt + val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime) + s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration) .persist() .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing }