From 8028f3a0b4003af15ed44d9ef4727b56f4b10534 Mon Sep 17 00:00:00 2001
From: Marcin Tustin <marcin.tustin@gmail.com>
Date: Mon, 2 May 2016 19:37:57 -0700
Subject: [PATCH] [SPARK-14685][CORE] Document heritability of localProperties

## What changes were proposed in this pull request?

This updates the java-/scala- doc for setLocalProperty to document heritability of localProperties. This also adds tests for that behaviour.

## How was this patch tested?

Tests pass. New tests were added.

Author: Marcin Tustin <marcin.tustin@gmail.com>

Closes #12455 from marcintustin/SPARK-14685.
---
 .../scala/org/apache/spark/SparkContext.scala |  5 ++++
 .../spark/api/java/JavaSparkContext.scala     |  9 ++++--
 .../org/apache/spark/SparkContextSuite.scala  | 28 +++++++++++++++++++
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 302dec25c6..58618b4192 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -608,6 +608,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * scheduler pool. User-defined properties may also be set here. These properties are propagated
    * through to worker tasks and can be accessed there via
    * [[org.apache.spark.TaskContext#getLocalProperty]].
+   *
+   * These properties are inherited by child threads spawned from this thread. This
+   * may have unexpected consequences when working with thread pools. The standard java
+   * implementation of thread pools have worker threads spawn other worker threads.
+   * As a result, local properties may propagate unpredictably.
    */
   def setLocalProperty(key: String, value: String) {
     if (value == null) {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index dfd91ae338..fb6323413e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -712,8 +712,13 @@ class JavaSparkContext(val sc: SparkContext)
   }
 
   /**
-   * Set a local property that affects jobs submitted from this thread, such as the
-   * Spark fair scheduler pool.
+   * Set a local property that affects jobs submitted from this thread, and all child
+   * threads, such as the Spark fair scheduler pool.
+   *
+   * These properties are inherited by child threads spawned from this thread. This
+   * may have unexpected consequences when working with thread pools. The standard java
+   * implementation of thread pools have worker threads spawn other worker threads.
+   * As a result, local properties may propagate unpredictably.
    */
   def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value)
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index a759f364fe..63987084ff 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -323,4 +323,32 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
       assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
     }
   }
+
+
+  test("localProperties are inherited by spawned threads.") {
+    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+    sc.setLocalProperty("testProperty", "testValue")
+    var result = "unset";
+    val thread = new Thread() { override def run() = {result = sc.getLocalProperty("testProperty")}}
+    thread.start()
+    thread.join()
+    sc.stop()
+    assert(result == "testValue")
+  }
+
+  test("localProperties do not cross-talk between threads.") {
+    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+    var result = "unset";
+    val thread1 = new Thread() {
+      override def run() = {sc.setLocalProperty("testProperty", "testValue")}}
+    // testProperty should be unset and thus return null
+    val thread2 = new Thread() {
+      override def run() = {result = sc.getLocalProperty("testProperty")}}
+    thread1.start()
+    thread1.join()
+    thread2.start()
+    thread2.join()
+    sc.stop()
+    assert(result == null)
+  }
 }
-- 
GitLab