Skip to content
Snippets Groups Projects
Commit 18b9b3b9 authored by Tathagata Das's avatar Tathagata Das
Browse files

More classes made private[streaming] to hide from scala docs.

parent 7e0271b4
No related branches found
No related tags found
No related merge requests found
Showing
with 137 additions and 52 deletions
......@@ -12,6 +12,7 @@ third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/spark-env.sh
conf/streaming-env.sh
conf/log4j.properties
docs/_site
docs/api
......@@ -31,4 +32,5 @@ project/plugins/src_managed/
logs/
log/
spark-tests.log
streaming-tests.log
dependency-reduced-pom.xml
......@@ -189,7 +189,7 @@ abstract class DStream[T: ClassManifest] (
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
assert(
metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000,
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
......
......@@ -14,7 +14,7 @@ class JobManager(ssc: StreamingContext, numThreads: Int = 1) extends Logging {
try {
val timeTaken = job.run()
logInfo("Total delay: %.5f s for job %s (execution: %.5f s)".format(
(System.currentTimeMillis() - job.time) / 1000.0, job.id, timeTaken / 1000.0))
(System.currentTimeMillis() - job.time.milliseconds) / 1000.0, job.id, timeTaken / 1000.0))
} catch {
case e: Exception =>
logError("Running " + job + " failed", e)
......
......@@ -22,7 +22,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration, generateRDDs(_))
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, generateRDDs(_))
def start() {
// If context was started from checkpoint, then restart timer such that
......
......@@ -15,7 +15,6 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
......@@ -101,14 +100,27 @@ class StreamingContext private (
protected[streaming] var receiverJobThread: Thread = null
protected[streaming] var scheduler: Scheduler = null
/**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
*/
def remember(duration: Time) {
graph.remember(duration)
}
def checkpoint(dir: String, interval: Time = null) {
if (dir != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(dir))
checkpointDir = dir
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
*/
def checkpoint(directory: String, interval: Time = null) {
if (directory != null) {
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
checkpointDir = directory
checkpointInterval = interval
} else {
checkpointDir = null
......@@ -122,9 +134,8 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
/**
* Create an input stream that pulls messages form a Kafka Broker.
*
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
......@@ -147,6 +158,15 @@ class StreamingContext private (
inputStream
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def networkTextStream(
hostname: String,
port: Int,
......@@ -155,6 +175,16 @@ class StreamingContext private (
networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def networkStream[T: ClassManifest](
hostname: String,
port: Int,
......@@ -166,16 +196,32 @@ class StreamingContext private (
inputStream
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream (
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
/**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
......@@ -188,7 +234,11 @@ class StreamingContext private (
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and executes the necessary processing on them.
* for new files and reads them using the given key-value types and input format.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
......@@ -200,13 +250,23 @@ class StreamingContext private (
inputStream
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat).
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
......@@ -218,13 +278,9 @@ class StreamingContext private (
inputStream
}
def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
val queue = new Queue[RDD[T]]
val inputStream = queueStream(queue, true, null)
queue ++= array
inputStream
}
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
*/
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
......@@ -256,7 +312,7 @@ class StreamingContext private (
}
/**
* This function starts the execution of the streams.
* Starts the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointInterval == null && graph != null) {
......@@ -284,7 +340,7 @@ class StreamingContext private (
}
/**
* This function stops the execution of the streams.
* Sstops the execution of the streams.
*/
def stop() {
try {
......@@ -302,6 +358,10 @@ class StreamingContext private (
object StreamingContext {
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
protected[streaming] def createNewSparkContext(master: String, frameworkName: String): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
......@@ -312,10 +372,6 @@ object StreamingContext {
new SparkContext(master, frameworkName)
}
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
protected[streaming] def rddToFileName[T](prefix: String, suffix: String, time: Time): String = {
if (prefix == null) {
time.milliseconds.toString
......
package spark.streaming
/**
* This class is simple wrapper class that represents time in UTC.
* @param millis Time in UTC long
* This is a simple class that represents time. Internally, it represents time as UTC.
* The recommended way to create instances of Time is to use helper objects
* [[spark.streaming.Milliseconds]], [[spark.streaming.Seconds]], and [[spark.streaming.Minutes]].
* @param millis Time in UTC.
*/
case class Time(private val millis: Long) {
def < (that: Time): Boolean = (this.millis < that.millis)
def <= (that: Time): Boolean = (this.millis <= that.millis)
def > (that: Time): Boolean = (this.millis > that.millis)
def >= (that: Time): Boolean = (this.millis >= that.millis)
......@@ -45,23 +47,33 @@ case class Time(private val millis: Long) {
def milliseconds: Long = millis
}
object Time {
private[streaming] object Time {
val zero = Time(0)
implicit def toTime(long: Long) = Time(long)
implicit def toLong(time: Time) = time.milliseconds
}
/**
* Helper object that creates instance of [[spark.streaming.Time]] representing
* a given number of milliseconds.
*/
object Milliseconds {
def apply(milliseconds: Long) = Time(milliseconds)
}
/**
* Helper object that creates instance of [[spark.streaming.Time]] representing
* a given number of seconds.
*/
object Seconds {
def apply(seconds: Long) = Time(seconds * 1000)
}
}
object Minutes {
/**
* Helper object that creates instance of [[spark.streaming.Time]] representing
* a given number of minutes.
*/
object Minutes {
def apply(minutes: Long) = Time(minutes * 60000)
}
......@@ -4,6 +4,7 @@ import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
import spark.streaming.{Time, DStream}
private[streaming]
class CoGroupedDStream[K : ClassManifest](
parents: Seq[DStream[(_, _)]],
partitioner: Partitioner
......
......@@ -10,7 +10,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.HashSet
private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
......
......@@ -17,6 +17,7 @@ import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
private[streaming]
class FlumeInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
......@@ -93,6 +94,7 @@ private[streaming] object SparkFlumeEvent {
}
/** A simple server that implements Flume's Avro protocol. */
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
......@@ -108,12 +110,13 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
/** A NetworkReceiver which listens for events using the
* Flume Avro interface.*/
private[streaming]
class FlumeReceiver(
streamId: Int,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
streamId: Int,
host: String,
port: Int,
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
lazy val dataHandler = new DataHandler(this, storageLevel)
......
......@@ -21,10 +21,12 @@ import scala.collection.JavaConversions._
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
private[streaming]
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
private[streaming]
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
......@@ -39,6 +41,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
......@@ -98,6 +101,7 @@ class KafkaInputDStream[T: ClassManifest](
}
}
private[streaming]
class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
......
......@@ -17,6 +17,7 @@ import java.util.concurrent.ArrayBlockingQueue
* data into Spark Streaming, though it requires the sender to batch data and serialize it
* in the format that the system is configured with.
*/
private[streaming]
class RawInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
......@@ -29,6 +30,7 @@ class RawInputDStream[T: ClassManifest](
}
}
private[streaming]
class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
extends NetworkReceiver[Any](streamId) {
......
......@@ -11,6 +11,7 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Interval, Time, DStream}
private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
......
......@@ -6,6 +6,7 @@ import spark.storage.StorageLevel
import java.io._
import java.net.Socket
private[streaming]
class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
......@@ -19,7 +20,7 @@ class SocketInputDStream[T: ClassManifest](
}
}
private[streaming]
class SocketReceiver[T: ClassManifest](
streamId: Int,
host: String,
......@@ -50,7 +51,7 @@ class SocketReceiver[T: ClassManifest](
}
private[streaming]
object SocketReceiver {
/**
......
......@@ -6,6 +6,7 @@ import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming.{Time, DStream}
private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManifest](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
......
......@@ -5,6 +5,7 @@ import spark.RDD
import collection.mutable.ArrayBuffer
import spark.rdd.UnionRDD
private[streaming]
class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
......
......@@ -5,7 +5,7 @@ import spark.rdd.UnionRDD
import spark.storage.StorageLevel
import spark.streaming.{Interval, Time, DStream}
private[streaming]
class WindowedDStream[T: ClassManifest](
parent: DStream[T],
_windowTime: Time,
......
package spark.streaming.util
import spark.streaming._
trait Clock {
private[streaming]
trait Clock {
def currentTime(): Long
def waitTillTime(targetTime: Long): Long
}
private[streaming]
class SystemClock() extends Clock {
val minPollTime = 25L
......@@ -54,6 +53,7 @@ class SystemClock() extends Clock {
}
}
private[streaming]
class ManualClock() extends Clock {
var time = 0L
......
package spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
val minPollTime = 25L
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment