Skip to content
Snippets Groups Projects
Commit 47b7ebad authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added the Spark Streaing code, ported to Akka 2

parent dc8763fc
No related branches found
No related tags found
No related merge requests found
Showing
with 1208 additions and 1 deletion
package spark
import scala.collection.mutable.HashMap
class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
val index = idx
}
class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
@transient
val splits_ = (0 until blockIds.size).map(i => {
new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
}).toArray
@transient
lazy val locations_ = {
val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
val locations = blockManager.getLocations(blockIds)
HashMap(blockIds.zip(locations):_*)
}
override def splits = splits_
override def compute(split: Split): Iterator[T] = {
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDSplit].blockId
blockManager.get(blockId) match {
case Some(block) => block.asInstanceOf[Iterator[T]]
case None =>
throw new Exception("Could not compute split, block " + blockId + " not found")
}
}
override def preferredLocations(split: Split) =
locations_(split.asInstanceOf[BlockRDDSplit].blockId)
override val dependencies: List[Dependency[_]] = Nil
}
......@@ -409,6 +409,11 @@ class SparkContext(
* various Spark features.
*/
object SparkContext {
// TODO: temporary hack for using HDFS as input in streaing
var inputFile: String = null
var idealPartitions: Int = 1
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
......
......@@ -8,7 +8,7 @@ object SparkBuild extends Build {
// "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
val HADOOP_VERSION = "0.20.205.0"
lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel, streaming)
lazy val core = Project("core", file("core"), settings = coreSettings)
......@@ -18,6 +18,8 @@ object SparkBuild extends Build {
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core)
lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core)
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.6.0-SNAPSHOT",
......@@ -82,6 +84,8 @@ object SparkBuild extends Build {
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
def streamingSettings = sharedSettings ++ Seq(name := "spark-streaming")
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
......
......@@ -46,6 +46,7 @@ CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl"
EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
STREAMING_DIR="$FWDIR/streaming"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH"
......@@ -55,6 +56,7 @@ CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $CORE_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar"
done
......
Hello world!
What's up?
There is no cow level
#!/bin/bash
./run spark.stream.SentenceGenerator localhost 7078 sentences.txt 1
package spark.stream
case class BlockID(sRds: String, sInterval: Interval, sPartition: Int) {
override def toString : String = (
sRds + BlockID.sConnector +
sInterval.beginTime + BlockID.sConnector +
sInterval.endTime + BlockID.sConnector +
sPartition
)
}
object BlockID {
val sConnector = '-'
def parse(name : String) = BlockID(
name.split(BlockID.sConnector)(0),
new Interval(name.split(BlockID.sConnector)(1).toLong,
name.split(BlockID.sConnector)(2).toLong),
name.split(BlockID.sConnector)(3).toInt)
}
\ No newline at end of file
package spark.stream
import spark.Logging
import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue}
import java.net._
import java.io._
import java.nio._
import java.nio.charset._
import java.nio.channels._
import java.nio.channels.spi._
abstract class ConnectionHandler(host: String, port: Int, connect: Boolean)
extends Thread with Logging {
val selector = SelectorProvider.provider.openSelector()
val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
initLogging()
override def run() {
try {
if (connect) {
connect()
} else {
listen()
}
var interrupted = false
while(!interrupted) {
preSelect()
while(!interestChangeRequests.isEmpty) {
val (key, ops) = interestChangeRequests.dequeue
val lastOps = key.interestOps()
key.interestOps(ops)
def intToOpStr(op: Int): String = {
val opStrs = new ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}
logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
selector.select()
interrupted = Thread.currentThread.isInterrupted
val selectedKeys = selector.selectedKeys().iterator()
while (selectedKeys.hasNext) {
val key = selectedKeys.next.asInstanceOf[SelectionKey]
selectedKeys.remove()
if (key.isValid) {
if (key.isAcceptable) {
accept(key)
} else if (key.isConnectable) {
finishConnect(key)
} else if (key.isReadable) {
read(key)
} else if (key.isWritable) {
write(key)
}
}
}
}
} catch {
case e: Exception => {
logError("Error in select loop", e)
}
}
}
def connect() {
val socketAddress = new InetSocketAddress(host, port)
val channel = SocketChannel.open()
channel.configureBlocking(false)
channel.socket.setReuseAddress(true)
channel.socket.setTcpNoDelay(true)
channel.connect(socketAddress)
channel.register(selector, SelectionKey.OP_CONNECT)
logInfo("Initiating connection to [" + socketAddress + "]")
}
def listen() {
val channel = ServerSocketChannel.open()
channel.configureBlocking(false)
channel.socket.setReuseAddress(true)
channel.socket.setReceiveBufferSize(256 * 1024)
channel.socket.bind(new InetSocketAddress(port))
channel.register(selector, SelectionKey.OP_ACCEPT)
logInfo("Listening on port " + port)
}
def finishConnect(key: SelectionKey) {
try {
val channel = key.channel.asInstanceOf[SocketChannel]
val address = channel.socket.getRemoteSocketAddress
channel.finishConnect()
logInfo("Connected to [" + host + ":" + port + "]")
ready(key)
} catch {
case e: IOException => {
logError("Error finishing connect to " + host + ":" + port)
close(key)
}
}
}
def accept(key: SelectionKey) {
try {
val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
val channel = serverChannel.accept()
val address = channel.socket.getRemoteSocketAddress
channel.configureBlocking(false)
logInfo("Accepted connection from [" + address + "]")
ready(channel.register(selector, 0))
} catch {
case e: IOException => {
logError("Error accepting connection", e)
}
}
}
def changeInterest(key: SelectionKey, ops: Int) {
logTrace("Added request to change ops to " + ops)
interestChangeRequests += ((key, ops))
}
def ready(key: SelectionKey)
def preSelect() {
}
def read(key: SelectionKey) {
throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString)
}
def write(key: SelectionKey) {
throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString)
}
def close(key: SelectionKey) {
try {
key.channel.close()
key.cancel()
Thread.currentThread.interrupt
} catch {
case e: Exception => logError("Error closing connection", e)
}
}
}
package spark.stream
import spark.SparkContext
import SparkContext._
import SparkStreamContext._
import spark.storage.StorageLevel
import scala.util.Sorting
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.Queue
import java.lang.{Long => JLong}
object DumbTopKWordCount2_Special {
def moreWarmup(sc: SparkContext) {
(0 until 20).foreach {i =>
sc.parallelize(1 to 20000000, 500)
.map(_ % 100).map(_.toString)
.map(x => (x, 1)).reduceByKey(_ + _, 10)
.collect()
}
}
def main (args: Array[String]) {
if (args.length < 2) {
println ("Usage: SparkStreamContext <host> <# sentence streams>")
System.exit(1)
}
val ssc = new SparkStreamContext(args(0), "WordCount2")
val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
if (args.length > 2) {
ssc.setTempDir(args(2))
}
GrepCount2.warmConnectionManagers(ssc.sc)
moreWarmup(ssc.sc)
val sentences = new UnifiedRDS(
(1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
)
def add(v1: JLong, v2: JLong) = (v1 + v2)
def subtract(v1: JLong, v2: JLong) = (v1 - v2)
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
val map = new java.util.HashMap[String, JLong]
var i = 0
var j = 0
while (iter.hasNext) {
val s = iter.next()
i = 0
while (i < s.length) {
j = i
while (j < s.length && s.charAt(j) != ' ') {
j += 1
}
if (j > i) {
val w = s.substring(i, j)
val c = map.get(w)
if (c == null) {
map.put(w, 1)
} else {
map.put(w, c + 1)
}
}
i = j
while (i < s.length && s.charAt(i) == ' ') {
i += 1
}
}
}
map.toIterator
}
val wordCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
wordCounts.persist(StorageLevel.MEMORY_ONLY)
val windowedCounts = wordCounts.window(Seconds(10), Seconds(1)).reduceByKey(_ + _, 10)
def topK(data: Iterator[(String, JLong)], k: Int): Iterator[(String, JLong)] = {
val taken = new Array[(String, JLong)](k)
var i = 0
var len = 0
var done = false
var value: (String, JLong) = null
var swap: (String, JLong) = null
var count = 0
while(data.hasNext) {
value = data.next
count += 1
/*println("count = " + count)*/
if (len == 0) {
taken(0) = value
len = 1
} else if (len < k || value._2 > taken(len - 1)._2) {
if (len < k) {
len += 1
}
taken(len - 1) = value
i = len - 1
while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
swap = taken(i)
taken(i) = taken(i-1)
taken(i - 1) = swap
i -= 1
}
}
}
println("Took " + len + " out of " + count + " items")
return taken.toIterator
}
val k = 10
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
partialTopKWindowedCounts.foreachRDD(rdd => {
val collectedCounts = rdd.collect
println("Collected " + collectedCounts.size + " items")
topK(collectedCounts.toIterator, k).foreach(println)
})
/*
windowedCounts.filter(_ == null).foreachRDD(rdd => {
val count = rdd.count
println("# of nulls = " + count)
})*/
ssc.run
}
}
package spark.stream
import spark.SparkContext
import SparkContext._
import SparkStreamContext._
import spark.storage.StorageLevel
import scala.util.Sorting
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.collection.JavaConversions.mapAsScalaMap
import java.lang.{Long => JLong}
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
object DumbWordCount2_Special {
def moreWarmup(sc: SparkContext) {
(0 until 20).foreach {i =>
sc.parallelize(1 to 20000000, 500)
.map(_ % 100).map(_.toString)
.map(x => (x, 1)).reduceByKey(_ + _, 10)
.collect()
}
}
def main (args: Array[String]) {
if (args.length < 2) {
println ("Usage: SparkStreamContext <host> <# sentence streams>")
System.exit(1)
}
val ssc = new SparkStreamContext(args(0), "WordCount2")
val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
if (args.length > 2) {
ssc.setTempDir(args(2))
}
GrepCount2.warmConnectionManagers(ssc.sc)
moreWarmup(ssc.sc)
val sentences = new UnifiedRDS(
(1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
)
def add(v1: JLong, v2: JLong) = (v1 + v2)
def subtract(v1: JLong, v2: JLong) = (v1 - v2)
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
val map = new java.util.HashMap[String, JLong]
var i = 0
var j = 0
while (iter.hasNext) {
val s = iter.next()
i = 0
while (i < s.length) {
j = i
while (j < s.length && s.charAt(j) != ' ') {
j += 1
}
if (j > i) {
val w = s.substring(i, j)
val c = map.get(w)
if (c == null) {
map.put(w, 1)
} else {
map.put(w, c + 1)
}
}
i = j
while (i < s.length && s.charAt(i) == ' ') {
i += 1
}
}
}
map.toIterator
}
val wordCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
wordCounts.persist(StorageLevel.MEMORY_ONLY)
val windowedCounts = wordCounts.window(Seconds(10), Seconds(1)).reduceByKey(_ + _, 10)
windowedCounts.foreachRDD(_.collect)
ssc.run
}
}
package spark.stream
import spark.Logging
import scala.collection.mutable.HashSet
import scala.actors._
import scala.actors.Actor._
import scala.actors.remote._
import scala.actors.remote.RemoteActor._
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._
import org.apache.hadoop.util._
class FileStreamReceiver (
inputName: String,
rootDirectory: String,
intervalDuration: Long)
extends Logging {
val pollInterval = 100
val sparkstreamScheduler = {
val host = System.getProperty("spark.master.host")
val port = System.getProperty("spark.master.port").toInt + 1
RemoteActor.select(Node(host, port), 'SparkStreamScheduler)
}
val directory = new Path(rootDirectory)
val fs = directory.getFileSystem(new Configuration())
val files = new HashSet[String]()
var time: Long = 0
def start() {
fs.mkdirs(directory)
files ++= getFiles()
actor {
logInfo("Monitoring directory - " + rootDirectory)
while(true) {
testFiles(getFiles())
Thread.sleep(pollInterval)
}
}
}
def getFiles(): Iterable[String] = {
fs.listStatus(directory).map(_.getPath.toString)
}
def testFiles(fileList: Iterable[String]) {
fileList.foreach(file => {
if (!files.contains(file)) {
if (!file.endsWith("_tmp")) {
notifyFile(file)
}
files += file
}
})
}
def notifyFile(file: String) {
logInfo("Notifying file " + file)
time += intervalDuration
val interval = Interval(LongTime(time), LongTime(time + intervalDuration))
sparkstreamScheduler ! InputGenerated(inputName, interval, file)
}
}
package spark.stream
import SparkStreamContext._
import scala.util.Sorting
import spark.SparkContext
import spark.storage.StorageLevel
object GrepCount {
var inputFile : String = null
var HDFS : String = null
var idealPartitions : Int = 0
def main (args: Array[String]) {
if (args.length != 4) {
println ("Usage: GrepCount <host> <HDFS> <Input file> <Ideal Partitions>")
System.exit(1)
}
HDFS = args(1)
inputFile = HDFS + args(2)
idealPartitions = args(3).toInt
println ("Input file: " + inputFile)
val ssc = new SparkStreamContext(args(0), "GrepCount")
SparkContext.idealPartitions = idealPartitions
SparkContext.inputFile = inputFile
val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
//sentences.print
val matching = sentences.filter(_.contains("light"))
matching.foreachRDD(rdd => println(rdd.count))
ssc.run
}
}
package spark.stream
import SparkStreamContext._
import scala.util.Sorting
import spark.SparkEnv
import spark.SparkContext
import spark.storage.StorageLevel
import spark.network.Message
import spark.network.ConnectionManagerId
import java.nio.ByteBuffer
object GrepCount2 {
def startSparkEnvs(sc: SparkContext) {
val dummy = sc.parallelize(0 to 1000, 100).persist(StorageLevel.DISK_AND_MEMORY)
sc.runJob(dummy, (_: Iterator[Int]) => {})
println("SparkEnvs started")
Thread.sleep(1000)
/*sc.runJob(sc.parallelize(0 to 1000, 100), (_: Iterator[Int]) => {})*/
}
def warmConnectionManagers(sc: SparkContext) {
val slaveConnManagerIds = sc.parallelize(0 to 100, 100).map(
i => SparkEnv.get.connectionManager.id).collect().distinct
println("\nSlave ConnectionManagerIds")
slaveConnManagerIds.foreach(println)
println
Thread.sleep(1000)
val numSlaves = slaveConnManagerIds.size
val count = 3
val size = 5 * 1024 * 1024
val iterations = (500 * 1024 * 1024 / (numSlaves * size)).toInt
println("count = " + count + ", size = " + size + ", iterations = " + iterations)
(0 until count).foreach(i => {
val resultStrs = sc.parallelize(0 until numSlaves, numSlaves).map(i => {
val connManager = SparkEnv.get.connectionManager
val thisConnManagerId = connManager.id
/*connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
})*/
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
val startTime = System.currentTimeMillis
val futures = (0 until iterations).map(i => {
slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => {
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
println("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
}).flatMap(x => x)
val results = futures.map(f => f())
val finishTime = System.currentTimeMillis
val mb = size * results.size / 1024.0 / 1024.0
val ms = finishTime - startTime
val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
println(resultStr)
System.gc()
resultStr
}).collect()
println("---------------------")
println("Run " + i)
resultStrs.foreach(println)
println("---------------------")
})
}
def main (args: Array[String]) {
if (args.length < 2) {
println ("Usage: GrepCount2 <host> <# sentence streams>")
System.exit(1)
}
val ssc = new SparkStreamContext(args(0), "GrepCount2")
val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
if (args.length > 2) {
ssc.setTempDir(args(2))
}
/*startSparkEnvs(ssc.sc)*/
warmConnectionManagers(ssc.sc)
val sentences = new UnifiedRDS(
(1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-"+i, 500)).toArray
)
val matching = sentences.filter(_.contains("light"))
matching.foreachRDD(rdd => println(rdd.count))
ssc.run
}
}
package spark.stream
import SparkStreamContext._
import scala.util.Sorting
import spark.SparkContext
import spark.storage.StorageLevel
object GrepCountApprox {
var inputFile : String = null
var hdfs : String = null
var idealPartitions : Int = 0
def main (args: Array[String]) {
if (args.length != 5) {
println ("Usage: GrepCountApprox <host> <HDFS> <Input file> <Ideal Partitions> <Timeout>")
System.exit(1)
}
hdfs = args(1)
inputFile = hdfs + args(2)
idealPartitions = args(3).toInt
val timeout = args(4).toLong
println ("Input file: " + inputFile)
val ssc = new SparkStreamContext(args(0), "GrepCount")
SparkContext.idealPartitions = idealPartitions
SparkContext.inputFile = inputFile
ssc.setTempDir(hdfs + "/tmp")
val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
//sentences.print
val matching = sentences.filter(_.contains("light"))
var i = 0
val startTime = System.currentTimeMillis
matching.foreachRDD { rdd =>
val myNum = i
val result = rdd.countApprox(timeout)
val initialTime = (System.currentTimeMillis - startTime) / 1000.0
printf("APPROX\t%.2f\t%d\tinitial\t%.1f\t%.1f\n", initialTime, myNum, result.initialValue.mean,
result.initialValue.high - result.initialValue.low)
result.onComplete { r =>
val finalTime = (System.currentTimeMillis - startTime) / 1000.0
printf("APPROX\t%.2f\t%d\tfinal\t%.1f\t0.0\t%.1f\n", finalTime, myNum, r.mean, finalTime - initialTime)
}
i += 1
}
ssc.run
}
}
package spark.stream
import scala.collection.mutable.Map
object IdealPerformance {
val base: String = "The medium researcher counts around the pinched troop The empire breaks " +
"Matei Matei announces HY with a theorem "
def main (args: Array[String]) {
val sentences: String = base * 100000
for (i <- 1 to 30) {
val start = System.nanoTime
val words = sentences.split(" ")
val pairs = words.map(word => (word, 1))
val counts = Map[String, Int]()
println("Job " + i + " position A at " + (System.nanoTime - start) / 1e9)
pairs.foreach((pair) => {
var t = counts.getOrElse(pair._1, 0)
counts(pair._1) = t + pair._2
})
println("Job " + i + " position B at " + (System.nanoTime - start) / 1e9)
for ((word, count) <- counts) {
print(word + " " + count + "; ")
}
println
println("Job " + i + " finished in " + (System.nanoTime - start) / 1e9)
}
}
}
\ No newline at end of file
package spark.stream
case class Interval (val beginTime: Time, val endTime: Time) {
def this(beginMs: Long, endMs: Long) = this(new LongTime(beginMs), new LongTime(endMs))
def duration(): Time = endTime - beginTime
def += (time: Time) {
beginTime += time
endTime += time
this
}
def + (time: Time): Interval = {
new Interval(beginTime + time, endTime + time)
}
def < (that: Interval): Boolean = {
if (this.duration != that.duration) {
throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
}
this.endTime < that.endTime
}
def <= (that: Interval) = (this < that || this == that)
def > (that: Interval) = !(this <= that)
def >= (that: Interval) = !(this < that)
def next(): Interval = {
this + (endTime - beginTime)
}
def isZero() = (beginTime.isZero && endTime.isZero)
def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString
override def toString = "[" + beginTime + ", " + endTime + "]"
}
object Interval {
/*
implicit def longTupleToInterval (longTuple: (Long, Long)) =
Interval(longTuple._1, longTuple._2)
implicit def intTupleToInterval (intTuple: (Int, Int)) =
Interval(intTuple._1, intTuple._2)
implicit def string2Interval (str: String): Interval = {
val parts = str.split(",")
if (parts.length == 1)
return Interval.zero
return Interval (parts(0).toInt, parts(1).toInt)
}
def getInterval (timeMs: Long, intervalDurationMs: Long): Interval = {
val intervalBeginMs = timeMs / intervalDurationMs * intervalDurationMs
Interval(intervalBeginMs, intervalBeginMs + intervalDurationMs)
}
*/
def zero() = new Interval (Time.zero, Time.zero)
def currentInterval(intervalDuration: LongTime): Interval = {
val time = LongTime(System.currentTimeMillis)
val intervalBegin = time.floor(intervalDuration)
Interval(intervalBegin, intervalBegin + intervalDuration)
}
}
package spark.stream
class Job(val time: Time, func: () => _) {
val id = Job.getNewId()
def run() {
func()
}
override def toString = "SparkStream Job " + id + ":" + time
}
object Job {
var lastId = 1
def getNewId() = synchronized {
lastId += 1
lastId
}
}
package spark.stream
import spark.SparkEnv
import spark.Logging
import scala.collection.mutable.PriorityQueue
import scala.actors._
import scala.actors.Actor._
import scala.actors.remote._
import scala.actors.remote.RemoteActor._
import scala.actors.scheduler.ResizableThreadPoolScheduler
import scala.actors.scheduler.ForkJoinScheduler
sealed trait JobManagerMessage
case class RunJob(job: Job) extends JobManagerMessage
case class JobCompleted(handlerId: Int) extends JobManagerMessage
class JobHandler(ssc: SparkStreamContext, val id: Int) extends DaemonActor with Logging {
var busy = false
def act() {
loop {
receive {
case job: Job => {
SparkEnv.set(ssc.env)
try {
logInfo("Starting " + job)
job.run()
logInfo("Finished " + job)
if (job.time.isInstanceOf[LongTime]) {
val longTime = job.time.asInstanceOf[LongTime]
logInfo("Total pushing + skew + processing delay for " + longTime + " is " +
(System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
}
} catch {
case e: Exception => logError("SparkStream job failed", e)
}
busy = false
reply(JobCompleted(id))
}
}
}
}
}
class JobManager(ssc: SparkStreamContext, numThreads: Int = 2) extends DaemonActor with Logging {
implicit private val jobOrdering = new Ordering[Job] {
override def compare(job1: Job, job2: Job): Int = {
if (job1.time < job2.time) {
return 1
} else if (job2.time < job1.time) {
return -1
} else {
return 0
}
}
}
private val jobs = new PriorityQueue[Job]()
private val handlers = (0 until numThreads).map(i => new JobHandler(ssc, i))
def act() {
handlers.foreach(_.start)
loop {
receive {
case RunJob(job) => {
jobs += job
logInfo("Job " + job + " submitted")
runJob()
}
case JobCompleted(handlerId) => {
runJob()
}
}
}
}
def runJob(): Unit = {
logInfo("Attempting to allocate job ")
if (jobs.size > 0) {
handlers.find(!_.busy).foreach(handler => {
val job = jobs.dequeue
logInfo("Allocating job " + job + " to handler " + handler.id)
handler.busy = true
handler ! job
})
}
}
}
object JobManager {
def main(args: Array[String]) {
val ssc = new SparkStreamContext("local[4]", "JobManagerTest")
val jobManager = new JobManager(ssc)
jobManager.start()
val t = System.currentTimeMillis
for (i <- 1 to 10) {
jobManager ! RunJob(new Job(
LongTime(i),
() => {
Thread.sleep(500)
println("Job " + i + " took " + (System.currentTimeMillis - t) + " ms")
}
))
}
Thread.sleep(6000)
}
}
package spark.stream
import spark.{Logging, SparkEnv}
import java.util.concurrent.Executors
class JobManager2(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
def run() {
SparkEnv.set(ssc.env)
try {
logInfo("Starting " + job)
job.run()
logInfo("Finished " + job)
if (job.time.isInstanceOf[LongTime]) {
val longTime = job.time.asInstanceOf[LongTime]
logInfo("Total notification + skew + processing delay for " + longTime + " is " +
(System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
if (System.getProperty("spark.stream.distributed", "false") == "true") {
TestInputBlockTracker.setEndTime(job.time)
}
}
} catch {
case e: Exception => logError("SparkStream job failed", e)
}
}
}
initLogging()
val jobExecutor = Executors.newFixedThreadPool(numThreads)
def runJob(job: Job) {
jobExecutor.execute(new JobHandler(ssc, job))
}
}
package spark.stream
import spark.Logging
import spark.storage.StorageLevel
import scala.math._
import scala.collection.mutable.{Queue, HashMap, ArrayBuffer}
import scala.actors._
import scala.actors.Actor._
import scala.actors.remote._
import scala.actors.remote.RemoteActor._
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._
import org.apache.hadoop.util._
/*import akka.actor.Actor._*/
class NetworkStreamReceiver[T: ClassManifest] (
inputName: String,
intervalDuration: Time,
splitId: Int,
ssc: SparkStreamContext,
tempDirectory: String)
extends DaemonActor
with Logging {
/**
* Assume all data coming in has non-decreasing timestamp.
*/
final class Inbox[T: ClassManifest] (intervalDuration: Time) {
var currentBucket: (Interval, ArrayBuffer[T]) = null
val filledBuckets = new Queue[(Interval, ArrayBuffer[T])]()
def += (tuple: (Time, T)) = addTuple(tuple)
def addTuple(tuple: (Time, T)) {
val (time, data) = tuple
val interval = getInterval (time)
filledBuckets.synchronized {
if (currentBucket == null) {
currentBucket = (interval, new ArrayBuffer[T]())
}
if (interval != currentBucket._1) {
filledBuckets += currentBucket
currentBucket = (interval, new ArrayBuffer[T]())
}
currentBucket._2 += data
}
}
def getInterval(time: Time): Interval = {
val intervalBegin = time.floor(intervalDuration)
Interval (intervalBegin, intervalBegin + intervalDuration)
}
def hasFilledBuckets(): Boolean = {
filledBuckets.synchronized {
return filledBuckets.size > 0
}
}
def popFilledBucket(): (Interval, ArrayBuffer[T]) = {
filledBuckets.synchronized {
if (filledBuckets.size == 0) {
return null
}
return filledBuckets.dequeue()
}
}
}
val inbox = new Inbox[T](intervalDuration)
lazy val sparkstreamScheduler = {
val host = System.getProperty("spark.master.host")
val port = System.getProperty("spark.master.port").toInt
val url = "akka://spark@%s:%s/user/SparkStreamScheduler".format(host, port)
ssc.actorSystem.actorFor(url)
}
/*sparkstreamScheduler ! Test()*/
val intervalDurationMillis = intervalDuration.asInstanceOf[LongTime].milliseconds
val useBlockManager = true
initLogging()
override def act() {
// register the InputReceiver
val port = 7078
RemoteActor.alive(port)
RemoteActor.register(Symbol("NetworkStreamReceiver-"+inputName), self)
logInfo("Registered actor on port " + port)
loop {
reactWithin (getSleepTime) {
case TIMEOUT =>
flushInbox()
case data =>
val t = data.asInstanceOf[T]
inbox += (getTimeFromData(t), t)
}
}
}
def getSleepTime(): Long = {
(System.currentTimeMillis / intervalDurationMillis + 1) *
intervalDurationMillis - System.currentTimeMillis
}
def getTimeFromData(data: T): Time = {
LongTime(System.currentTimeMillis)
}
def flushInbox() {
while (inbox.hasFilledBuckets) {
inbox.synchronized {
val (interval, data) = inbox.popFilledBucket()
val dataArray = data.toArray
logInfo("Received " + dataArray.length + " items at interval " + interval)
val reference = {
if (useBlockManager) {
writeToBlockManager(dataArray, interval)
} else {
writeToDisk(dataArray, interval)
}
}
if (reference != null) {
logInfo("Notifying scheduler")
sparkstreamScheduler ! InputGenerated(inputName, interval, reference.toString)
}
}
}
}
def writeToDisk(data: Array[T], interval: Interval): String = {
try {
// TODO(Haoyuan): For current test, the following writing to file lines could be
// commented.
val fs = new Path(tempDirectory).getFileSystem(new Configuration())
val inputDir = new Path(
tempDirectory,
inputName + "-" + interval.toFormattedString)
val inputFile = new Path(inputDir, "part-" + splitId)
logInfo("Writing to file " + inputFile)
if (System.getProperty("spark.fake", "false") != "true") {
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(inputFile, true)))
data.foreach(x => writer.write(x.toString + "\n"))
writer.close()
} else {
logInfo("Fake file")
}
inputFile.toString
}catch {
case e: Exception =>
logError("Exception writing to file at interval " + interval + ": " + e.getMessage, e)
null
}
}
def writeToBlockManager(data: Array[T], interval: Interval): String = {
try{
val blockId = inputName + "-" + interval.toFormattedString + "-" + splitId
if (System.getProperty("spark.fake", "false") != "true") {
logInfo("Writing as block " + blockId )
ssc.env.blockManager.put(blockId.toString, data.toIterator, StorageLevel.DISK_AND_MEMORY)
} else {
logInfo("Fake block")
}
blockId
} catch {
case e: Exception =>
logError("Exception writing to block manager at interval " + interval + ": " + e.getMessage, e)
null
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment