diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index e53d6907bc404b48a3bca3537a0bdb5e862c2dd2..79b0d81af52b54474bd480879ec0ae23483a425a 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
       val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")
 
       // Verify that this has only anchors and span (we are wrapping in span)
-      val allowedNodeLabels = Set("a", "span")
+      val allowedNodeLabels = Set("a", "span", "br")
       val illegalNodes = xml \\ "_"  filterNot { case node: Node =>
         allowedNodeLabels.contains(node.label)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bcf0d970f7ec1afc8d1b1921e4bf794651196f19..affc2018c43cbcf9e4ad48f707b2f8cc833ac1c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -252,6 +252,8 @@ class StreamExecution(
    */
   private def runBatches(): Unit = {
     try {
+      sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
+        interruptOnCancel = true)
       if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
@@ -289,6 +291,7 @@ class StreamExecution(
               if (currentBatchId < 0) {
                 // We'll do this initialization only once
                 populateStartOffsets(sparkSessionToRunBatches)
+                sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
                 logDebug(s"Stream running from $committedOffsets to $availableOffsets")
               } else {
                 constructNextBatch()
@@ -308,6 +311,7 @@ class StreamExecution(
               logDebug(s"batch ${currentBatchId} committed")
               // We'll increase currentBatchId after we complete processing current batch's data
               currentBatchId += 1
+              sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
             } else {
               currentStatus = currentStatus.copy(isDataAvailable = false)
               updateStatusMessage("Waiting for data to arrive")
@@ -684,8 +688,11 @@ class StreamExecution(
     // intentionally
     state.set(TERMINATED)
     if (microBatchThread.isAlive) {
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
       microBatchThread.interrupt()
       microBatchThread.join()
+      // microBatchThread may spawn new jobs, so we need to cancel again to prevent a leak
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
     }
     logInfo(s"Query $prettyIdString was stopped")
   }
@@ -825,6 +832,11 @@ class StreamExecution(
     }
   }
 
+  private def getBatchDescriptionString: String = {
+    val batchDescription = if (currentBatchId < 0) "init" else currentBatchId.toString
+    Option(name).map(_ + "<br/>").getOrElse("") +
+      s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
+  }
 }
 
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 13fe51a557733df974a5a80634c3129877990494..01ea62a9de4d5cc5745ea6ccf647c67b01cda924 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -25,6 +25,8 @@ import scala.util.control.ControlThrowable
 
 import org.apache.commons.io.FileUtils
 
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.ExplainCommand
@@ -500,6 +502,70 @@ class StreamSuite extends StreamTest {
       }
     }
   }
+
+  test("calling stop() on a query cancels related jobs") {
+    val input = MemoryStream[Int]
+    val query = input
+      .toDS()
+      .map { i =>
+        while (!org.apache.spark.TaskContext.get().isInterrupted()) {
+          // keep looping till interrupted by query.stop()
+          Thread.sleep(100)
+        }
+        i
+      }
+      .writeStream
+      .format("console")
+      .start()
+
+    input.addData(1)
+    // wait for jobs to start
+    eventually(timeout(streamingTimeout)) {
+      assert(sparkContext.statusTracker.getActiveJobIds().nonEmpty)
+    }
+
+    query.stop()
+    // make sure jobs are stopped
+    eventually(timeout(streamingTimeout)) {
+      assert(sparkContext.statusTracker.getActiveJobIds().isEmpty)
+    }
+  }
+
+  test("batch id is updated correctly in the job description") {
+    val queryName = "memStream"
+    @volatile var jobDescription: String = null
+    def assertDescContainsQueryNameAnd(batch: Integer): Unit = {
+      // wait for listener event to be processed
+      spark.sparkContext.listenerBus.waitUntilEmpty(streamingTimeout.toMillis)
+      assert(jobDescription.contains(queryName) && jobDescription.contains(s"batch = $batch"))
+    }
+
+    spark.sparkContext.addSparkListener(new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        jobDescription = jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+      }
+    })
+
+    val input = MemoryStream[Int]
+    val query = input
+      .toDS()
+      .map(_ + 1)
+      .writeStream
+      .format("memory")
+      .queryName(queryName)
+      .start()
+
+    input.addData(1)
+    query.processAllAvailable()
+    assertDescContainsQueryNameAnd(batch = 0)
+    input.addData(2, 3)
+    query.processAllAvailable()
+    assertDescContainsQueryNameAnd(batch = 1)
+    input.addData(4)
+    query.processAllAvailable()
+    assertDescContainsQueryNameAnd(batch = 2)
+    query.stop()
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {