Skip to content
Snippets Groups Projects
Commit a310de69 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #926 from kayousterhout/dynamic

Changed localProperties to use ThreadLocal (not DynamicVariable).
parents 58c7d8b1 93c42532
No related branches found
No related tags found
No related merge requests found
...@@ -27,7 +27,6 @@ import scala.collection.generic.Growable ...@@ -27,7 +27,6 @@ import scala.collection.generic.Growable
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.util.DynamicVariable
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
...@@ -257,20 +256,20 @@ class SparkContext( ...@@ -257,20 +256,20 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack // Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new DynamicVariable[Properties](null) private val localProperties = new ThreadLocal[Properties]
def initLocalProperties() { def initLocalProperties() {
localProperties.value = new Properties() localProperties.set(new Properties())
} }
def setLocalProperty(key: String, value: String) { def setLocalProperty(key: String, value: String) {
if (localProperties.value == null) { if (localProperties.get() == null) {
localProperties.value = new Properties() localProperties.set(new Properties())
} }
if (value == null) { if (value == null) {
localProperties.value.remove(key) localProperties.get.remove(key)
} else { } else {
localProperties.value.setProperty(key, value) localProperties.get.setProperty(key, value)
} }
} }
...@@ -724,7 +723,7 @@ class SparkContext( ...@@ -724,7 +723,7 @@ class SparkContext(
logInfo("Starting job: " + callSite) logInfo("Starting job: " + callSite)
val start = System.nanoTime val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
localProperties.value) localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint() rdd.doCheckpoint()
result result
...@@ -807,7 +806,8 @@ class SparkContext( ...@@ -807,7 +806,8 @@ class SparkContext(
val callSite = Utils.formatSparkCallSite val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite) logInfo("Starting job: " + callSite)
val start = System.nanoTime val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value) val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
localProperties.get)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result result
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment