diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7421821e2601bacac6f2dafe8ab9713533402d48..67270c38fa1b56ec996b13eabe8ea8feb277f2b1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1787,10 +1787,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * has overridden the call site using `setCallSite()`, this will return the user's version.
    */
   private[spark] def getCallSite(): CallSite = {
-    Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
-      val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
-      CallSite(shortCallSite, longCallSite)
-    }.getOrElse(Utils.getCallSite())
+    val callSite = Utils.getCallSite()
+    CallSite(
+      Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm),
+      Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm)
+    )
   }
 
   /**
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 1a900007b696b7cfc76866086019b79c2130e0ab..79077e4a49e1a6d6318755611b6e6b1d4205d482 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -37,7 +37,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1da0b0a54df07174912658336b0b22eb9b9ecd22..1a6edf9473d84abf0f1f6237cd2f8dbe72020627 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -341,7 +341,7 @@ abstract class DStream[T: ClassTag] (
       // of RDD generation, else generate nothing.
       if (isTimeValid(time)) {
 
-        val rddOption = createRDDWithLocalProperties(time) {
+        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
           // Disable checks for existing output directories in jobs launched by the streaming
           // scheduler, since we may need to write output to an existing directory during checkpoint
           // recovery; see SPARK-4835 for more details. We need to have this call here because
@@ -373,27 +373,52 @@ abstract class DStream[T: ClassTag] (
   /**
    * Wrap a body of code such that the call site and operation scope
    * information are passed to the RDDs created in this body properly.
-   */
-  protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+   * @param body RDD creation code to execute with certain local properties.
+   * @param time Current batch time that should be embedded in the scope names
+   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the inner RDDs generated
+   *                           by `body` will be displayed in the UI; only the scope and callsite
+   *                           of the DStream operation that generated `this` will be displayed.
+   */
+  protected[streaming] def createRDDWithLocalProperties[U](
+      time: Time,
+      displayInnerRDDOps: Boolean)(body: => U): U = {
     val scopeKey = SparkContext.RDD_SCOPE_KEY
     val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
     // Pass this DStream's operation scope and creation site information to RDDs through
     // thread-local properties in our SparkContext. Since this method may be called from another
     // DStream, we need to temporarily store any old scope and creation site information to
     // restore them later after setting our own.
-    val prevCallSite = ssc.sparkContext.getCallSite()
+    val prevCallSite = CallSite(
+      ssc.sparkContext.getLocalProperty(CallSite.SHORT_FORM),
+      ssc.sparkContext.getLocalProperty(CallSite.LONG_FORM)
+    )
     val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
     val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
 
     try {
-      ssc.sparkContext.setCallSite(creationSite)
+      if (displayInnerRDDOps) {
+        // Unset the short form call site, so that generated RDDs get their own
+        ssc.sparkContext.setLocalProperty(CallSite.SHORT_FORM, null)
+        ssc.sparkContext.setLocalProperty(CallSite.LONG_FORM, null)
+      } else {
+        // Set the callsite, so that the generated RDDs get the DStream's call site and
+        // the internal RDD call sites do not get displayed
+        ssc.sparkContext.setCallSite(creationSite)
+      }
+
       // Use the DStream's base scope for this RDD so we can (1) preserve the higher level
       // DStream operation name, and (2) share this scope with other DStreams created in the
       // same operation. Disallow nesting so that low-level Spark primitives do not show up.
       // TODO: merge callsites with scopes so we can just reuse the code there
       makeScope(time).foreach { s =>
         ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
-        ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+        if (displayInnerRDDOps) {
+          // Allow inner RDDs to add inner scopes
+          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, null)
+        } else {
+          // Do not allow inner RDDs to override the scope set by DStream
+          ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+        }
       }
 
       body
@@ -628,7 +653,7 @@ abstract class DStream[T: ClassTag] (
    */
   def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
     val cleanedF = context.sparkContext.clean(foreachFunc, false)
-    this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
+    foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
   }
 
   /**
@@ -639,7 +664,23 @@ abstract class DStream[T: ClassTag] (
     // because the DStream is reachable from the outer object here, and because
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
-    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
+    foreachRDD(foreachFunc, displayInnerRDDOps = true)
+  }
+
+  /**
+   * Apply a function to each RDD in this DStream. This is an output operator, so
+   * 'this' DStream will be registered as an output stream and therefore materialized.
+   * @param foreachFunc foreachRDD function
+   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+   *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
+   *                           only the scopes and callsites of `foreachRDD` will override those
+   *                           of the RDDs on the display.
+   */
+  private def foreachRDD(
+      foreachFunc: (RDD[T], Time) => Unit,
+      displayInnerRDDOps: Boolean): Unit = {
+    new ForEachDStream(this,
+      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
   }
 
   /**
@@ -730,7 +771,7 @@ abstract class DStream[T: ClassTag] (
         // scalastyle:on println
       }
     }
-    new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
+    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
   }
 
   /**
@@ -900,7 +941,7 @@ abstract class DStream[T: ClassTag] (
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsObjectFile(file)
     }
-    this.foreachRDD(saveFunc)
+    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
   }
 
   /**
@@ -913,7 +954,7 @@ abstract class DStream[T: ClassTag] (
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsTextFile(file)
     }
-    this.foreachRDD(saveFunc)
+    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
   }
 
   /**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index c109ceccc6989bde369fe3de02fc99bed7750e54..4410a9977c87b3d1b0b737f8c1334daf4f8f10a2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -22,10 +22,19 @@ import org.apache.spark.streaming.{Duration, Time}
 import org.apache.spark.streaming.scheduler.Job
 import scala.reflect.ClassTag
 
+/**
+ * An internal DStream used to represent output operations like DStream.foreachRDD.
+ * @param parent        Parent DStream
+ * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+ *                           by `foreachFunc` will be displayed in the UI; only the scope and
+ *                           callsite of `DStream.foreachRDD` will be displayed.
+ */
 private[streaming]
 class ForEachDStream[T: ClassTag] (
     parent: DStream[T],
-    foreachFunc: (RDD[T], Time) => Unit
+    foreachFunc: (RDD[T], Time) => Unit,
+    displayInnerRDDOps: Boolean
   ) extends DStream[Unit](parent.ssc) {
 
   override def dependencies: List[DStream[_]] = List(parent)
@@ -37,8 +46,7 @@ class ForEachDStream[T: ClassTag] (
   override def generateJob(time: Time): Option[Job] = {
     parent.getOrCompute(time) match {
       case Some(rdd) =>
-        val jobFunc = () => createRDDWithLocalProperties(time) {
-          ssc.sparkContext.setCallSite(creationSite)
+        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
           foreachFunc(rdd, time)
         }
         Some(new Job(time, jobFunc))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5eabdf63dc8d778552ba76881c80932d8ffd34b1..080bc873fa0a86e91c390146a4fb23592da25b18 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -51,4 +51,17 @@ class TransformedDStream[U: ClassTag] (
     }
     Some(transformedRDD)
   }
+
+  /**
+   * Wrap a body of code such that the call site and operation scope
+   * information are passed to the RDDs created in this body properly.
+   * This has been overriden to make sure that `displayInnerRDDOps` is always `true`, that is,
+   * the inner scopes and callsites of RDDs generated in `DStream.transform` are always
+   * displayed in the UI.
+   */
+  override protected[streaming] def createRDDWithLocalProperties[U](
+      time: Time,
+      displayInnerRDDOps: Boolean)(body: => U): U = {
+    super.createRDDWithLocalProperties(time, displayInnerRDDOps = true)(body)
+  }
 }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 8844c9d74b933bd227221b244c257deb64bfd6d9..bc223e648a4172156cc2f73efb51764dbafce9d0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.streaming
 
+import scala.collection.mutable.ArrayBuffer
+
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.util.ManualClock
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 
 /**
  * Tests whether scope information is passed from DStream operations to RDDs correctly.
@@ -32,7 +35,9 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
   private val batchDuration: Duration = Seconds(1)
 
   override def beforeAll(): Unit = {
-    ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+    val conf = new SparkConf().setMaster("local").setAppName("test")
+    conf.set("spark.streaming.clock", classOf[ManualClock].getName())
+    ssc = new StreamingContext(new SparkContext(conf), batchDuration)
   }
 
   override def afterAll(): Unit = {
@@ -103,6 +108,8 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
 
   test("scoping nested operations") {
     val inputStream = new DummyInputDStream(ssc)
+    // countByKeyAndWindow internally uses reduceByKeyAndWindow, but only countByKeyAndWindow
+    // should appear in scope
     val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
     countStream.initialize(Time(0))
 
@@ -137,6 +144,57 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
     testStream(countStream)
   }
 
+  test("transform should allow RDD operations to be captured in scopes") {
+    val inputStream = new DummyInputDStream(ssc)
+    val transformedStream = inputStream.transform { _.map { _ -> 1}.reduceByKey(_ + _) }
+    transformedStream.initialize(Time(0))
+
+    val transformScopeBase = transformedStream.baseScope.map(RDDOperationScope.fromJson)
+    val transformScope1 = transformedStream.getOrCompute(Time(1000)).get.scope
+    val transformScope2 = transformedStream.getOrCompute(Time(2000)).get.scope
+    val transformScope3 = transformedStream.getOrCompute(Time(3000)).get.scope
+
+    // Assert that all children RDDs inherit the DStream operation name correctly
+    assertDefined(transformScopeBase, transformScope1, transformScope2, transformScope3)
+    assert(transformScopeBase.get.name === "transform")
+    assertNestedScopeCorrect(transformScope1.get, 1000)
+    assertNestedScopeCorrect(transformScope2.get, 2000)
+    assertNestedScopeCorrect(transformScope3.get, 3000)
+
+    def assertNestedScopeCorrect(rddScope: RDDOperationScope, batchTime: Long): Unit = {
+      assert(rddScope.name === "reduceByKey")
+      assert(rddScope.parent.isDefined)
+      assertScopeCorrect(transformScopeBase.get, rddScope.parent.get, batchTime)
+    }
+  }
+
+  test("foreachRDD should allow RDD operations to be captured in scope") {
+    val inputStream = new DummyInputDStream(ssc)
+    val generatedRDDs = new ArrayBuffer[RDD[(Int, Int)]]
+    inputStream.foreachRDD { rdd =>
+      generatedRDDs += rdd.map { _ -> 1}.reduceByKey(_ + _)
+    }
+    val batchCounter = new BatchCounter(ssc)
+    ssc.start()
+    val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+    clock.advance(3000)
+    batchCounter.waitUntilBatchesCompleted(3, 10000)
+    assert(generatedRDDs.size === 3)
+
+    val foreachBaseScope =
+      ssc.graph.getOutputStreams().head.baseScope.map(RDDOperationScope.fromJson)
+    assertDefined(foreachBaseScope)
+    assert(foreachBaseScope.get.name === "foreachRDD")
+
+    val rddScopes = generatedRDDs.map { _.scope }
+    assertDefined(rddScopes: _*)
+    rddScopes.zipWithIndex.foreach { case (rddScope, idx) =>
+      assert(rddScope.get.name === "reduceByKey")
+      assert(rddScope.get.parent.isDefined)
+      assertScopeCorrect(foreachBaseScope.get, rddScope.get.parent.get, (idx + 1) * 1000)
+    }
+  }
+
   /** Assert that the RDD operation scope properties are not set in our SparkContext. */
   private def assertPropertiesNotSet(): Unit = {
     assert(ssc != null)
@@ -149,19 +207,12 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
       baseScope: RDDOperationScope,
       rddScope: RDDOperationScope,
       batchTime: Long): Unit = {
-    assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
-  }
-
-  /** Assert that the given RDD scope inherits the base name and ID correctly. */
-  private def assertScopeCorrect(
-      baseScopeId: String,
-      baseScopeName: String,
-      rddScope: RDDOperationScope,
-      batchTime: Long): Unit = {
+    val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
     val formattedBatchTime = UIUtils.formatBatchTime(
       batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
     assert(rddScope.id === s"${baseScopeId}_$batchTime")
     assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+    assert(rddScope.parent.isEmpty)  // There should not be any higher scope
   }
 
   /** Assert that all the specified options are defined. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 0d58a7b54412fc1f14b41c0a6625c5cd1928b452..a45c92d9c7bc88dfd4d42e70c3cf8360779cc48d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -98,7 +98,7 @@ class TestOutputStream[T: ClassTag](
   ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.collect()
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])
@@ -122,7 +122,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](
   extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
     val collected = rdd.glom().collect().map(_.toSeq)
     output += collected
-  }) {
+  }, false) {
 
   // This is to clear the output buffer every it is read from a checkpoint
   @throws(classOf[IOException])