diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 905bc723f69a9e27dcccedbe6399240c32f5168e..1361c30395b57d88fb672b3610cacd6cd998a81d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] (
     parent.getOrCompute(time) match {
       case Some(rdd) =>
         val jobFunc = () => {
+          ssc.sparkContext.setCallSite(creationSite)
           foreachFunc(rdd, time)
         }
         Some(new Job(time, jobFunc))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 4b49c4d25164575489ae7fdcedbce571d27d6c4c..9f352bdcb0893be6913999fa016d03ad79b37c1c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -336,16 +336,20 @@ package object testPackage extends Assertions {
 
       // Verify creation site of generated RDDs
       var rddGenerated = false
-      var rddCreationSiteCorrect = true
+      var rddCreationSiteCorrect = false
+      var foreachCallSiteCorrect = false
 
       inputStream.foreachRDD { rdd =>
         rddCreationSiteCorrect = rdd.creationSite == creationSite
+        foreachCallSiteCorrect =
+          rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite")
         rddGenerated = true
       }
       ssc.start()
 
       eventually(timeout(10000 millis), interval(10 millis)) {
         assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
+        assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
       }
     } finally {
       ssc.stop()