Skip to content
Snippets Groups Projects
Commit a2ea069a authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #937 from jerryshao/localProperties-fix

Fix PR926 local properties issues in Spark Streaming like scenarios
parents f06f2da2 aa0c29f7
No related branches found
No related tags found
No related merge requests found
......@@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new ThreadLocal[Properties]
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
}
def initLocalProperties() {
localProperties.set(new Properties())
......@@ -273,6 +275,9 @@ class SparkContext(
}
}
def getLocalProperty(key: String): String =
Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
......
......@@ -40,7 +40,7 @@ object ThreadingSuiteState {
}
class ThreadingSuite extends FunSuite with LocalSparkContext {
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
......@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}
test("set local properties in different thread") {
sc = new SparkContext("local", "test")
val sem = new Semaphore(0)
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}
threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
}
test("set and get local properties in parent-children thread") {
sc = new SparkContext("local", "test")
sc.setLocalProperty("test", "parent")
val sem = new Semaphore(0)
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
assert(sc.getLocalProperty("test") === "parent")
sc.setLocalProperty("test", i.toString)
assert(sc.getLocalProperty("test") === i.toString)
sem.release()
}
}
}
threads.foreach(_.start())
sem.acquire(5)
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment