diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index b11cfa667eb9238671b1126357e5d4e5642b8b51..7b5a243e26414ef3d77212f360707ba2c01f460a 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -47,6 +47,8 @@ public final class JavaFlumeEventCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + String master = args[0]; String host = args[1]; int port = Integer.parseInt(args[2]); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 16b8a948e6154ad7527041efdfb546a2f859d11f..04f62ee2041451db8ad249cd655762d0d5fde503 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -59,6 +59,8 @@ public final class JavaKafkaWordCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index 1e2efd359cff297129b6aa35ac42cb19b40072f7..c37b0cacc9a513b6ab90034700c8138411855030 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -53,6 +53,8 @@ public final class JavaNetworkWordCount { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context with a 1 second batch size JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index e05551ab833010df77cb761a17709583967fa84a..7ef9c6c8f4aaf0cad16304289bda46a4a114fc03 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -41,6 +41,8 @@ public final class JavaQueueStream { System.exit(1); } + StreamingExamples.setStreamingLogLevels(); + // Create the context JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class)); diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 4e0058cd707777c32ce3baa4d68e7d6e0d9d1e09..57e1b1f806e82ecfb88b3fa88d0360e77ea28bd4 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -18,17 +18,13 @@ package org.apache.spark.streaming.examples import scala.collection.mutable.LinkedList -import scala.util.Random import scala.reflect.ClassTag +import scala.util.Random -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.actorRef2Scala +import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} import org.apache.spark.SparkConf -import org.apache.spark.streaming.Seconds -import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.receivers.Receiver import org.apache.spark.util.AkkaUtils @@ -147,6 +143,8 @@ object ActorWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Seq(master, host, port) = args.toSeq // Create the context and set the batch size diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index ae3709b3d97f5561f857b7d6fab8ef0436888cbc..a59be7899dd37de2e50cfcd49e625992af589f2c 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.examples -import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam /** * Produces a count of events received from Flume. @@ -44,6 +44,8 @@ object FlumeEventCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, host, IntParam(port)) = args val batchInterval = Milliseconds(2000) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala index ea6ea674196a1effb94ab7138998d4076cb41799..704b315ef8b2214ece9bb44ab1518d1838c9dfd5 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala @@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ - /** * Counts words in new text files created in the given directory * Usage: HdfsWordCount <master> <directory> @@ -38,6 +37,8 @@ object HdfsWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // Create the context val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 31a94bd224a45f8f68f177777c7d37b9ff423de7..4a3d81c09a122aadbc5bc9672694a8be78261ce4 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -23,8 +23,8 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.util.RawTextHelper._ import org.apache.spark.streaming.kafka._ +import org.apache.spark.streaming.util.RawTextHelper._ /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._ */ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 5) { System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, zkQuorum, group, topics, numThreads) = args val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 325290b66f4decbe55ed025a865816d09ecbcc5c..78b49fdcf1eb3a84bd8d4b6814b978c039e9f689 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,12 +17,8 @@ package org.apache.spark.streaming.examples -import org.eclipse.paho.client.mqttv3.MqttClient -import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence -import org.eclipse.paho.client.mqttv3.MqttException -import org.eclipse.paho.client.mqttv3.MqttMessage -import org.eclipse.paho.client.mqttv3.MqttTopic import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} @@ -43,6 +39,8 @@ object MQTTPublisher { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Seq(brokerUrl, topic) = args.toSeq try { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 6a32c75373a7efcc3b3787c8cc3179dacaa169c2..c12139b3ec863a6bd0f72d2e386af69ca3d6561c 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -39,6 +39,8 @@ object NetworkWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // Create the context with a 1 second batch size val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala index 9d640e716bca978b74c76bb5cab3ffef6ad37ebf..4d4968ba6ae3e0d2d90685bc2841d6895c139501 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.examples +import scala.collection.mutable.SynchronizedQueue + import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ -import scala.collection.mutable.SynchronizedQueue - object QueueStream { def main(args: Array[String]) { @@ -30,7 +30,9 @@ object QueueStream { System.err.println("Usage: QueueStream <master>") System.exit(1) } - + + StreamingExamples.setStreamingLogLevels() + // Create the context val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala index c0706d07249824cc740968d141e2d874c9c185c6..3d08d86567a9abe9d813f54811245196953e22c6 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala @@ -17,11 +17,10 @@ package org.apache.spark.streaming.examples -import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel - import org.apache.spark.streaming._ import org.apache.spark.streaming.util.RawTextHelper +import org.apache.spark.util.IntParam /** * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited @@ -45,6 +44,8 @@ object RawNetworkGrep { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala index 002db57d592b2a88f491ca1813d95cd9e1a14ca6..1183eba84686bd5712c7668e58af05add060691d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -39,6 +39,8 @@ object StatefulNetworkWordCount { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala new file mode 100644 index 0000000000000000000000000000000000000000..d41d84a980dc73b8f4bcacf56d2a4de2af399c05 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala @@ -0,0 +1,21 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.Logging + +import org.apache.log4j.{Level, Logger} + +/** Utility functions for Spark Streaming examples. */ +object StreamingExamples extends Logging { + + /** Set reasonable logging levels for streaming if the user has not configured log4j. */ + def setStreamingLogLevels() { + val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + // We first log something to initialize Spark's default logging, then we override the + // logging level. + logInfo("Setting log level to [WARN] for streaming example." + + " To override add a custom log4j.properties to the classpath.") + Logger.getRootLogger.setLevel(Level.WARN) + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 3ccdc908e23c43d0c3424a5b6719950e7c658702..80b5a98b142c1218047d957f1dd554e81e316b64 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.storage.StorageLevel import com.twitter.algebird._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.SparkContext._ +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.twitter._ /** @@ -51,6 +51,8 @@ object TwitterAlgebirdCMS { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + // CMS parameters val DELTA = 1E-3 val EPS = 0.01 diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index c7e83e76b00570e721ee1fe965d119178b14528d..cb2f2c51a0cd65c2bf6355d1b6632c617574fb68 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -17,10 +17,11 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.storage.StorageLevel -import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.twitter._ /** @@ -44,6 +45,8 @@ object TwitterAlgebirdHLL { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ val BIT_SIZE = 12 val (master, filters) = (args.head, args.tail) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index e2b0418d55d2b14b08d50aa1500a6806306a7f5c..16c10feaba2c11bb265feb28fa0307e8c740dc49 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -36,6 +36,8 @@ object TwitterPopularTags { System.exit(1) } + StreamingExamples.setStreamingLogLevels() + val (master, filters) = (args.head, args.tail) val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 03902ec353babfbc6d2991d6af01d8e6ed19d09e..12d2a1084f9002bef4957faf1ea126229219db7e 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -76,6 +76,7 @@ object ZeroMQWordCount { "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } + StreamingExamples.setStreamingLogLevels() val Seq(master, url, topic) = args.toSeq // Create the context and set the batch size diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index 807af199f4fd0813b2961784f1dca9ed88d9a69a..da6b67bcceefe3ad5ad2b8e8690a462b763529b1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -17,9 +17,10 @@ package org.apache.spark.streaming.examples.clickstream +import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.examples.StreamingExamples /** Analyses a streaming dataset of web page views. This class demonstrates several types of * operators available in Spark streaming. @@ -36,6 +37,7 @@ object PageViewStream { " errorRatePerZipCode, activeUserCount, popularUsersSeen") System.exit(1) } + StreamingExamples.setStreamingLogLevels() val metric = args(0) val host = args(1) val port = args(2).toInt diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 00671ba5206f95382c021c220c081cea484656ab..837f1ea1d8962e8a2baccbbd110a01171472aa2f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -333,7 +333,7 @@ abstract class DStream[T: ClassTag] ( var numForgotten = 0 val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration)) generatedRDDs --= oldRDDs.keys - logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " + + logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " + (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", ")) dependencies.foreach(_.clearOldMetadata(time)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index a09b891956efe2348043c85cf6f11c22596e3be4..62d07b22c6d5c958a9c6a91c6e602c8d2bb82926 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -105,18 +105,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def generateJobs(time: Time): Seq[Job] = { this.synchronized { - logInfo("Generating jobs for time " + time) + logDebug("Generating jobs for time " + time) val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) - logInfo("Generated " + jobs.length + " jobs for time " + time) + logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } } def clearOldMetadata(time: Time) { this.synchronized { - logInfo("Clearing old metadata for time " + time) + logDebug("Clearing old metadata for time " + time) outputStreams.foreach(_.clearOldMetadata(time)) - logInfo("Cleared old metadata for time " + time) + logDebug("Cleared old metadata for time " + time) } }