Skip to content
Snippets Groups Projects
Commit 6fc6cf88 authored by Kunal Khamar's avatar Kunal Khamar Committed by Shixiong Zhu
Browse files

[SPARK-20464][SS] Add a job group and description for streaming queries and...

[SPARK-20464][SS] Add a job group and description for streaming queries and fix cancellation of running jobs using the job group

## What changes were proposed in this pull request?

Job group: adding a job group is required to properly cancel running jobs related to a query.
Description: the new description makes it easier to group the batches of a query by sorting by name in the Spark Jobs UI.

## How was this patch tested?

- Unit tests
- UI screenshot

  - Order by job id:
![screen shot 2017-04-27 at 5 10 09 pm](https://cloud.githubusercontent.com/assets/7865120/25509468/15452274-2b6e-11e7-87ba-d929816688cf.png)

  - Order by description:
![screen shot 2017-04-27 at 5 10 22 pm](https://cloud.githubusercontent.com/assets/7865120/25509474/1c298512-2b6e-11e7-99b8-fef1ef7665c1.png)

  - Order by job id (no query name):
![screen shot 2017-04-27 at 5 21 33 pm](https://cloud.githubusercontent.com/assets/7865120/25509482/28c96dc8-2b6e-11e7-8df0-9d3cdbb05e36.png)

  - Order by description (no query name):
![screen shot 2017-04-27 at 5 21 44 pm](https://cloud.githubusercontent.com/assets/7865120/25509489/37674742-2b6e-11e7-9357-b5c38ec16ac4.png)

Author: Kunal Khamar <kkhamar@outlook.com>

Closes #17765 from kunalkhamar/sc-6696.
parent ab30590f
No related branches found
No related tags found
No related merge requests found
......@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")
// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span")
val allowedNodeLabels = Set("a", "span", "br")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
......
......@@ -252,6 +252,8 @@ class StreamExecution(
*/
private def runBatches(): Unit = {
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
......@@ -289,6 +291,7 @@ class StreamExecution(
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
......@@ -308,6 +311,7 @@ class StreamExecution(
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
......@@ -684,8 +688,11 @@ class StreamExecution(
// intentionally
state.set(TERMINATED)
if (microBatchThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
microBatchThread.interrupt()
microBatchThread.join()
// microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
logInfo(s"Query $prettyIdString was stopped")
}
......@@ -825,6 +832,11 @@ class StreamExecution(
}
}
private def getBatchDescriptionString: String = {
val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
}
}
......
......@@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
......@@ -500,6 +502,70 @@ class StreamSuite extends StreamTest {
}
}
}
test("calling stop() on a query cancels related jobs") {
val input = MemoryStream[Int]
val query = input
.toDS()
.map { i =>
while (!org.apache.spark.TaskContext.get().isInterrupted()) {
// keep looping till interrupted by query.stop()
Thread.sleep(100)
}
i
}
.writeStream
.format("console")
.start()
input.addData(1)
// wait for jobs to start
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
}
query.stop()
// make sure jobs are stopped
eventually(timeout(streamingTimeout)) {
assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
}
}
test("batch id is updated correctly in the job description") {
val queryName = "memStream"
@volatile var jobDescription: String = null
def assertDescContainsQueryNameAnd(batch: Integer): Unit = {
// wait for listener event to be processed
spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
assert(jobDescription.contains(queryName) && jobDescription.contains(s"batch = $batch"))
}
spark.sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
jobDescription = jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
}
})
val input = MemoryStream[Int]
val query = input
.toDS()
.map(_ + 1)
.writeStream
.format("memory")
.queryName(queryName)
.start()
input.addData(1)
query.processAllAvailable()
assertDescContainsQueryNameAnd(batch = 0)
input.addData(2, 3)
query.processAllAvailable()
assertDescContainsQueryNameAnd(batch = 1)
input.addData(4)
query.processAllAvailable()
assertDescContainsQueryNameAnd(batch = 2)
query.stop()
}
}
abstract class FakeSource extends StreamSourceProvider {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment