diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 72cac42cd2b2bb507c282c0945209394d42a32ad..aba713cb4267a94f5168d36d580bd1a8220d9b9d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -43,9 +43,8 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  * :: DeveloperApi ::
  * Holds all the runtime environment objects for a running Spark instance (either master or worker),
  * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
- * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
- * objects needs to have the right SparkEnv set. You can get the current environment with
- * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
+ * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
+ * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
  *
  * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
  *       in a future release.
@@ -119,30 +118,28 @@ class SparkEnv (
 }
 
 object SparkEnv extends Logging {
-  private val env = new ThreadLocal[SparkEnv]
-  @volatile private var lastSetSparkEnv : SparkEnv = _
+  @volatile private var env: SparkEnv = _
 
   private[spark] val driverActorSystemName = "sparkDriver"
   private[spark] val executorActorSystemName = "sparkExecutor"
 
   def set(e: SparkEnv) {
-    lastSetSparkEnv = e
-    env.set(e)
+    env = e
   }
 
   /**
-   * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
-   * previously set in any thread.
+   * Returns the SparkEnv.
    */
   def get: SparkEnv = {
-    Option(env.get()).getOrElse(lastSetSparkEnv)
+    env
   }
 
   /**
    * Returns the ThreadLocal SparkEnv.
    */
+  @deprecated("Use SparkEnv.get instead", "1.2")
   def getThreadLocal: SparkEnv = {
-    env.get()
+    env
   }
 
   private[spark] def create(
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 924141475383db7bf6b2c07c6c05ffa7051b6ac1..ad6eb9ef5027789cb7c52c5ffde989f80658ef50 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -196,7 +196,6 @@ private[spark] class PythonRDD(
 
     override def run(): Unit = Utils.logUncaughtExceptions {
       try {
-        SparkEnv.set(env)
         val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
         val dataOut = new DataOutputStream(stream)
         // Partition index
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9bbfcdc4a0b6e4699b2980303e7477d633bb5997..616c7e6a4636825c48cf278c54434353b6981994 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -148,7 +148,6 @@ private[spark] class Executor(
 
     override def run() {
       val startTime = System.currentTimeMillis()
-      SparkEnv.set(env)
       Thread.currentThread.setContextClassLoader(replClassLoader)
       val ser = SparkEnv.get.closureSerializer.newInstance()
       logInfo(s"Running $taskName (TID $taskId)")
@@ -158,7 +157,6 @@ private[spark] class Executor(
       val startGCTime = gcTime
 
       try {
-        SparkEnv.set(env)
         Accumulators.clear()
         val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
         updateDependencies(taskFiles, taskJars)
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 5d77d3737845886abfa4130c733c886567ba893b..56ac7a69be0d3ce85a541c4433a5424168ae931a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -131,7 +131,6 @@ private[spark] class PipedRDD[T: ClassTag](
     // Start a thread to feed the process input from our parent's iterator
     new Thread("stdin writer for " + command) {
       override def run() {
-        SparkEnv.set(env)
         val out = new PrintWriter(proc.getOutputStream)
 
         // input the pipe context firstly
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8135cdbb4c31fd7b6e312e581f7e0673de6977cc..788eb1ff4e455f713bcff3419da6f6bb83c399c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -630,7 +630,6 @@ class DAGScheduler(
   protected def runLocallyWithinThread(job: ActiveJob) {
     var jobResult: JobResult = JobSucceeded
     try {
-      SparkEnv.set(env)
       val rdd = job.finalStage.rdd
       val split = rdd.partitions(job.partitions(0))
       val taskContext =
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4dc550413c13c47624451d503ceec29b2fa5d592..6d697e3d003f66f1533517361a33100f92f21cf3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -216,8 +216,6 @@ private[spark] class TaskSchedulerImpl(
    * that tasks are balanced across the cluster.
    */
   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
-    SparkEnv.set(sc.env)
-
     // Mark each slave as alive and remember its hostname
     // Also track if new executor is added
     var newExecAvail = false
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 374848358e7008c9123d6ac9a15e3114a621c8b2..7d73ada12d107f67ea0e2609aa1331c8654a8933 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
 
   /** Generate jobs and perform checkpoint for the given `time`.  */
   private def generateJobs(time: Time) {
-    SparkEnv.set(ssc.env)
     Try(graph.generateJobs(time)) match {
       case Success(jobs) =>
         val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1b034b9fb187c729fad8e416edaa3eea498d61bb..cfa3cd8925c807c9f36192980f7fa648272d4b20 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     }
     jobSet.handleJobStart(job)
     logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
-    SparkEnv.set(ssc.env)
   }
 
   private def handleJobCompletion(job: Job) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 5307fe189d717b181075bc85010521f0947ee8b4..7149dbc12a36540c740ca667d091d3a7792b06f9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
     @transient val thread  = new Thread() {
       override def run() {
         try {
-          SparkEnv.set(env)
           startReceivers()
         } catch {
           case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")