diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dd47c1dbbec0680ea126a6de8e6871e264585c6b..5ea0b48f6e4c402ae91258b4e0c3368c0741b55a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1015,7 +1015,8 @@ class DAGScheduler(
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
+              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
+              Option(sc.applicationId), sc.applicationAttemptId)
           }
 
         case stage: ResultStage =>
@@ -1024,7 +1025,8 @@ class DAGScheduler(
             val part = stage.rdd.partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
-              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
+              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
+              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
           }
       }
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 609f10aee940dd1ab9d2b96d9e2abdfdf25ba87d..1e7c63af2e797f108682bf6c7217ba8f13fe2833 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -43,7 +43,12 @@ import org.apache.spark.rdd.RDD
  *                 input RDD's partitions).
  * @param localProperties copy of thread-local properties set by the user on the driver side.
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
- */
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
+  */
 private[spark] class ResultTask[T, U](
     stageId: Int,
     stageAttemptId: Int,
@@ -52,8 +57,12 @@ private[spark] class ResultTask[T, U](
     locs: Seq[TaskLocation],
     val outputId: Int,
     localProperties: Properties,
-    metrics: TaskMetrics)
-  extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties)
+    metrics: TaskMetrics,
+    jobId: Option[Int] = None,
+    appId: Option[String] = None,
+    appAttemptId: Option[String] = None)
+  extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
+    appId, appAttemptId)
   with Serializable {
 
   @transient private[this] val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 448fe02084e0d54596b4774122755502036e35fa..66d6790e168f258b8540535eb8ca752aed56d7da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -44,6 +44,11 @@ import org.apache.spark.shuffle.ShuffleWriter
  * @param locs preferred task execution locations for locality scheduling
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
  * @param localProperties copy of thread-local properties set by the user on the driver side.
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
  */
 private[spark] class ShuffleMapTask(
     stageId: Int,
@@ -52,8 +57,12 @@ private[spark] class ShuffleMapTask(
     partition: Partition,
     @transient private var locs: Seq[TaskLocation],
     metrics: TaskMetrics,
-    localProperties: Properties)
-  extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties)
+    localProperties: Properties,
+    jobId: Option[Int] = None,
+    appId: Option[String] = None,
+    appAttemptId: Option[String] = None)
+  extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties, jobId,
+    appId, appAttemptId)
   with Logging {
 
   /** A constructor used only in test suites. This does not require passing in an RDD. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 48daa344f3c88b885459f5a06ead13629a2a625b..9385e3c31e1e4ad42362888871d046f56b50fcca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -29,7 +29,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils}
+import org.apache.spark.util._
 
 /**
  * A unit of execution. We have two kinds of Task's in Spark:
@@ -47,6 +47,11 @@ import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOu
  * @param partitionId index of the number in the RDD
  * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
  * @param localProperties copy of thread-local properties set by the user on the driver side.
+ *
+ * The parameters below are optional:
+ * @param jobId id of the job this task belongs to
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
  */
 private[spark] abstract class Task[T](
     val stageId: Int,
@@ -54,7 +59,10 @@ private[spark] abstract class Task[T](
     val partitionId: Int,
     // The default value is only used in tests.
     val metrics: TaskMetrics = TaskMetrics.registered,
-    @transient var localProperties: Properties = new Properties) extends Serializable {
+    @transient var localProperties: Properties = new Properties,
+    val jobId: Option[Int] = None,
+    val appId: Option[String] = None,
+    val appAttemptId: Option[String] = None) extends Serializable {
 
   /**
    * Called by [[org.apache.spark.executor.Executor]] to run this task.
@@ -79,9 +87,14 @@ private[spark] abstract class Task[T](
       metrics)
     TaskContext.setTaskContext(context)
     taskThread = Thread.currentThread()
+
     if (_killed) {
       kill(interruptThread = false)
     }
+
+    new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
+      Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()
+
     try {
       runTask(context)
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e09666c6103c6870846d8054d659b354cf961c91..caa768cfbdc6c5f2da8bbd8063f3684af01151bf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2440,6 +2440,68 @@ private[spark] object Utils extends Logging {
   }
 }
 
+/**
+ * An utility class used to set up Spark caller contexts to HDFS and Yarn. The `context` will be
+ * constructed by parameters passed in.
+ * When Spark applications run on Yarn and HDFS, its caller contexts will be written into Yarn RM
+ * audit log and hdfs-audit.log. That can help users to better diagnose and understand how
+ * specific applications impacting parts of the Hadoop system and potential problems they may be
+ * creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
+ * very helpful to track which upper level job issues it.
+ *
+ * @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
+ *
+ * The parameters below are optional:
+ * @param appId id of the app this task belongs to
+ * @param appAttemptId attempt id of the app this task belongs to
+ * @param jobId id of the job this task belongs to
+ * @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
+ * @param taskId task id
+ * @param taskAttemptNumber task attempt id
+ */
+private[spark] class CallerContext(
+   from: String,
+   appId: Option[String] = None,
+   appAttemptId: Option[String] = None,
+   jobId: Option[Int] = None,
+   stageId: Option[Int] = None,
+   stageAttemptId: Option[Int] = None,
+   taskId: Option[Long] = None,
+   taskAttemptNumber: Option[Int] = None) extends Logging {
+
+   val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
+   val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
+   val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
+   val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
+   val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
+   val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
+   val taskAttemptNumberStr =
+     if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
+
+   val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
+     jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr
+
+  /**
+   * Set up the caller context [[context]] by invoking Hadoop CallerContext API of
+   * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
+   */
+  def setCurrentContext(): Boolean = {
+    var succeed = false
+    try {
+      val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
+      val Builder = Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
+      val builderInst = Builder.getConstructor(classOf[String]).newInstance(context)
+      val hdfsContext = Builder.getMethod("build").invoke(builderInst)
+      callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
+      succeed = true
+    } catch {
+      case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+    }
+    succeed
+  }
+}
+
 /**
  * A utility class to redirect the child process's stdout or stderr.
  */
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4715fd29375d60fcf8a01647b2449d66c5eecc86..bc28b2d9cb831537855217dd836598b0f4e66fe4 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -788,6 +788,18 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
         .set("spark.executor.instances", "1")) === 3)
   }
 
+  test("Set Spark CallerContext") {
+    val context = "test"
+    try {
+      val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
+      assert(new CallerContext(context).setCurrentContext())
+      assert(s"SPARK_$context" ===
+        callerContext.getMethod("getCurrent").invoke(null).toString)
+    } catch {
+      case e: ClassNotFoundException =>
+        assert(!new CallerContext(context).setCurrentContext())
+    }
+  }
 
   test("encodeFileNameToURIRawPath") {
     assert(Utils.encodeFileNameToURIRawPath("abc") === "abc")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index ad50ea789a913fc2555181efe1af6a27bbb63529..aabae140af8b1cac519505a87f85c44bae3eef84 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -184,6 +184,8 @@ private[spark] class ApplicationMaster(
     try {
       val appAttemptId = client.getAttemptId()
 
+      var attemptID: Option[String] = None
+
       if (isClusterMode) {
         // Set the web ui port to be ephemeral for yarn so we don't conflict with
         // other spark processes running on the same box
@@ -196,8 +198,13 @@ private[spark] class ApplicationMaster(
         // Set this internal configuration if it is running on cluster mode, this
         // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
         System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+        attemptID = Option(appAttemptId.getAttemptId.toString)
       }
 
+      new CallerContext("APPMASTER",
+        Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
+
       logInfo("ApplicationAttemptId: " + appAttemptId)
 
       val fs = FileSystem.get(yarnConf)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2398f0aea316a420355fba34d0aca14da7e57f87..ea4e1160b7672294020996b022cda55af00eaabc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -54,7 +54,7 @@ import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallerContext, Utils}
 
 private[spark] class Client(
     val args: ClientArguments,
@@ -161,6 +161,8 @@ private[spark] class Client(
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
       launcherBackend.setAppId(appId.toString)
 
+      new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
+
       // Verify whether the cluster has enough resources for our AM
       verifyClusterResources(newAppResponse)