diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1c9ece62707818cfc403103269a81c2448dc3552..3273817c78562f03ec6629d5c229159968db34b0 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -175,7 +175,7 @@ When an output operator is called, it triggers the computation of a stream. Curr <table class="table"> <tr><th style="width:30%">Operator</th><th>Meaning</th></tr> <tr> - <td> <b>foreach</b>(<i>func</i>) </td> + <td> <b>foreachRDD</b>(<i>func</i>) </td> <td> The fundamental output operator. Applies a function, <i>func</i>, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. </td> </tr> @@ -375,7 +375,7 @@ There are two failure behaviors based on which input sources are used. 1. _Using HDFS files as input source_ - Since the data is reliably stored on HDFS, all data can re-computed and therefore no data will be lost due to any failure. 1. _Using any input source that receives data through a network_ - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. -Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreach`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. +Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation always leads to the same result. As a result, all DStream transformations are guaranteed to have _exactly-once_ semantics. That is, the final transformed result will be same even if there were was a worker node failure. However, output operations (like `foreachRDD`) have _at-least once_ semantics, that is, the transformed data may get written to an external entity more than once in the event of a worker failure. While this is acceptable for saving to HDFS using the `saveAs*Files` operations (as the file will simply get over-written by the same data), additional transactions-like mechanisms may be necessary to achieve exactly-once semantics for output operations. ## Failure of the Driver Node A system that is required to operate 24/7 needs to be able tolerate the failure of the driver node as well. Spark Streaming does this by saving the state of the DStream computation periodically to a HDFS file, that can be used to restart the streaming computation in the event of a failure of the driver node. This checkpointing is enabled by setting a HDFS directory for checkpointing using `ssc.checkpoint(<checkpoint directory>)` as described [earlier](#rdd-checkpointing-within-dstreams). To elaborate, the following state is periodically saved to a file. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index 3d08d86567a9abe9d813f54811245196953e22c6..99b79c3949a4ed4c6fd166501070d3907e2aff6f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -58,7 +58,7 @@ object RawNetworkGrep { val rawStreams = (1 to numStreams).map(_ => ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = ssc.union(rawStreams) - union.filter(_.contains("the")).count().foreach(r => + union.filter(_.contains("the")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 80b5a98b142c1218047d957f1dd554e81e316b64..483c4d311810fb81cb789415d67f11d2840be6f4 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -81,7 +81,7 @@ object TwitterAlgebirdCMS { val exactTopUsers = users.map(id => (id, 1)) .reduceByKey((a, b) => a + b) - approxTopUsers.foreach(rdd => { + approxTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() val partialTopK = partial.heavyHitters.map(id => @@ -96,7 +96,7 @@ object TwitterAlgebirdCMS { } }) - exactTopUsers.foreach(rdd => { + exactTopUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partialMap = rdd.collect().toMap val partialTopK = rdd.map( diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index cb2f2c51a0cd65c2bf6355d1b6632c617574fb68..94c2bf29ac4333c188ac4e2749fab7d4a3eac348 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -67,7 +67,7 @@ object TwitterAlgebirdHLL { val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - approxUsers.foreach(rdd => { + approxUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() globalHll += partial @@ -76,7 +76,7 @@ object TwitterAlgebirdHLL { } }) - exactUsers.foreach(rdd => { + exactUsers.foreachRDD(rdd => { if (rdd.count() != 0) { val partial = rdd.first() userSet ++= partial diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 16c10feaba2c11bb265feb28fa0307e8c740dc49..8a70d4a978cd43c5a8507373704e9a99d09e8e7f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -56,13 +56,13 @@ object TwitterPopularTags { // Print popular hashtags - topCounts60.foreach(rdd => { + topCounts60.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) - topCounts10.foreach(rdd => { + topCounts10.foreachRDD(rdd => { val topList = rdd.take(5) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index da6b67bcceefe3ad5ad2b8e8690a462b763529b1..bb44bc3d06ef3918ff8b80352ae6e14b62c7c646 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -91,7 +91,7 @@ object PageViewStream { case "popularUsersSeen" => // Look for users in our existing dataset and print it out if we have a match pageViews.map(view => (view.userID, 1)) - .foreach((rdd, time) => rdd.join(userList) + .foreachRDD((rdd, time) => rdd.join(userList) .map(_._2._2) .take(10) .foreach(u => println("Saw user %s at time %s".format(u, time)))) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index b98f4a5101a780475b37674470e9c6a9cf6e518e..9432a709d0ca964dc37918c2abe771fe5c75ce29 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -17,19 +17,20 @@ package org.apache.spark.streaming -import StreamingContext._ -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} +import scala.deprecated import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{ObjectInputStream, IOException, ObjectOutputStream} +import StreamingContext._ +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.Job +import org.apache.spark.util.MetadataCleaner /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -487,15 +488,29 @@ abstract class DStream[T: ClassTag] ( * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: RDD[T] => Unit) { - this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) + @deprecated("use foreachRDD", "0.9.0") + def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + @deprecated("use foreachRDD", "0.9.0") + def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc) + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreachRDD(foreachFunc: RDD[T] => Unit) { + this.foreachRDD((r: RDD[T], t: Time) => foreachFunc(r)) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. */ - def foreach(foreachFunc: (RDD[T], Time) => Unit) { + def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) { ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc))) } @@ -719,7 +734,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsObjectFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } /** @@ -732,7 +747,7 @@ abstract class DStream[T: ClassTag] ( val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } - this.foreach(saveFunc) + this.foreachRDD(saveFunc) } def register() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 56dbcbda23a7029bda765437ce235430136fa4df..69d80c37112375de3d63d7d80b027a7339b0dff0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -582,7 +582,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } /** @@ -612,7 +612,7 @@ extends Serializable { val file = rddToFileName(prefix, suffix, time) rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) } - self.foreach(saveFunc) + self.foreachRDD(saveFunc) } private def getKeyClass() = implicitly[ClassTag[K]].runtimeClass diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 64f38ce1c0283278c358c44572d79881652b3b37..cea4795eb527be76eeb6251717f6335d45a28f43 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -243,17 +243,39 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. + * + * @deprecated As of release 0.9.0, replaced by foreachRDD */ + @Deprecated def foreach(foreachFunc: JFunction[R, Void]) { - dstream.foreach(rdd => foreachFunc.call(wrapRDD(rdd))) + foreachRDD(foreachFunc) } /** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. + * + * @deprecated As of release 0.9.0, replaced by foreachRDD */ + @Deprecated def foreach(foreachFunc: JFunction2[R, Time, Void]) { - dstream.foreach((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) + foreachRDD(foreachFunc) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreachRDD(foreachFunc: JFunction[R, Void]) { + dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd))) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * 'this' DStream will be registered as an output stream and therefore materialized. + */ + def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) { + dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time)) } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index ee6b433d1f1fa2d05a33991cad2d7cdf0b81a7c5..9a187ce031f091475cfc961000fd6ac6a1beeba9 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -383,7 +383,7 @@ class BasicOperationsSuite extends TestSuiteBase { val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) val stream = new TestInputStream[Int](ssc, input, 2) ssc.registerInputStream(stream) - stream.foreach(_ => {}) // Dummy output stream + stream.foreachRDD(_ => {}) // Dummy output stream ssc.start() Thread.sleep(2000) def getInputFromSlice(fromMillis: Long, toMillis: Long) = {