Skip to content
Snippets Groups Projects
Commit a1b8dd53 authored by Tathagata Das's avatar Tathagata Das
Browse files

Added StreamingContext.getOrCreate to for automatic recovery, and added...

Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it.
parent a8729770
No related branches found
No related tags found
No related merge requests found
......@@ -47,7 +47,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{WORKER_URL}}", "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
......
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.util.IntParam
import java.io.File
import org.apache.spark.rdd.RDD
import com.google.common.io.Files
import java.nio.charset.Charset
object RecoverableNetworkWordCount {
def createContext(master: String, ip: String, port: Int, outputPath: String) = {
val outputFile = new File(outputPath)
if (outputFile.exists()) outputFile.delete()
// Create the context with a 1 second batch size
println("Creating new context")
val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
println("Appending to " + outputFile.getAbsolutePath)
Files.append(counts + "\n", outputFile, Charset.defaultCharset())
})
ssc
}
def main(args: Array[String]) {
if (args.length != 5) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
|
|In local mode, <master> should be 'local[n]' with n > 1
|Both <checkpoint-directory> and <output-directory> should be full paths
""".stripMargin
)
System.exit(1)
}
val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(master, ip, port, outputPath)
})
ssc.start()
}
}
......@@ -24,7 +24,7 @@ import java.util.concurrent.RejectedExecutionException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.MetadataCleaner
......@@ -141,9 +141,15 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
private[streaming]
object CheckpointReader extends Logging {
def doesCheckpointExist(path: String): Boolean = {
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
val fs = new Path(path).getFileSystem(new Configuration())
(attempts.count(p => fs.exists(p)) > 1)
}
def read(path: String): Checkpoint = {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"))
val compressionCodec = CompressionCodec.createCodec()
......@@ -175,7 +181,7 @@ object CheckpointReader extends Logging {
}
})
throw new Exception("Could not read checkpoint from path '" + path + "'")
throw new SparkException("Could not read checkpoint from path '" + path + "'")
}
}
......
......@@ -570,12 +570,28 @@ class StreamingContext private (
}
object StreamingContext {
object StreamingContext extends Logging {
implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
createOnCheckpointError: Boolean = false
): StreamingContext = {
if (CheckpointReader.doesCheckpointExist(checkpointPath)) {
logInfo("Creating streaming context from checkpoint file")
new StreamingContext(checkpointPath)
} else {
logInfo("Creating new streaming context")
val ssc = creatingFunc()
ssc.checkpoint(checkpointPath)
ssc
}
}
protected[streaming] def createNewSparkContext(
master: String,
appName: String,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment