Skip to content
Snippets Groups Projects
Commit 69cd53ea authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-4601][Streaming] Set correct call site for streaming jobs so that it is...

[SPARK-4601][Streaming] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI

When running the NetworkWordCount, the description of the word count jobs are set as "getCallsite at DStream:xxx" . This should be set to the line number of the streaming application that has the output operation that led to the job being created. This is because the callsite is incorrectly set in the thread launching the jobs. This PR fixes that.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #3455 from tdas/streaming-callsite-fix and squashes the following commits:

69fc26f [Tathagata Das] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI
parent d2407601
No related branches found
No related tags found
No related merge requests found
...@@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] ( ...@@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] (
parent.getOrCompute(time) match { parent.getOrCompute(time) match {
case Some(rdd) => case Some(rdd) =>
val jobFunc = () => { val jobFunc = () => {
ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time) foreachFunc(rdd, time)
} }
Some(new Job(time, jobFunc)) Some(new Job(time, jobFunc))
......
...@@ -336,16 +336,20 @@ package object testPackage extends Assertions { ...@@ -336,16 +336,20 @@ package object testPackage extends Assertions {
// Verify creation site of generated RDDs // Verify creation site of generated RDDs
var rddGenerated = false var rddGenerated = false
var rddCreationSiteCorrect = true var rddCreationSiteCorrect = false
var foreachCallSiteCorrect = false
inputStream.foreachRDD { rdd => inputStream.foreachRDD { rdd =>
rddCreationSiteCorrect = rdd.creationSite == creationSite rddCreationSiteCorrect = rdd.creationSite == creationSite
foreachCallSiteCorrect =
rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite")
rddGenerated = true rddGenerated = true
} }
ssc.start() ssc.start()
eventually(timeout(10000 millis), interval(10 millis)) { eventually(timeout(10000 millis), interval(10 millis)) {
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct") assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
} }
} finally { } finally {
ssc.stop() ssc.stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment