diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index a80853614614247fde70754a91c3150272ef8983..094a95d70ea8af6590a439a1d1a0e25cf3ac4d23 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -52,20 +52,20 @@ private object Accumulators if (original) { originals(a.id) = a } else { - val accums = localAccums.getOrElseUpdate(currentThread, Map()) + val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map()) accums(a.id) = a } } // Clear the local (non-original) accumulators for the current thread def clear: Unit = synchronized { - localAccums.remove(currentThread) + localAccums.remove(Thread.currentThread) } // Get the values of the local accumulators for the current thread (by ID) def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() - for ((id, accum) <- localAccums.getOrElse(currentThread, Map())) + for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) ret(id) = accum.value return ret } diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index af390d55d863f598c0bfcade83388a30b0de1228..e7cd4364ee0890bcf85e6bbf4be90eb5c297076c 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -12,7 +12,7 @@ class JavaSerializationStream(out: OutputStream) extends SerializationStream { class JavaDeserializationStream(in: InputStream) extends DeserializationStream { val objIn = new ObjectInputStream(in) { override def resolveClass(desc: ObjectStreamClass) = - Class.forName(desc.getName, false, currentThread.getContextClassLoader) + Class.forName(desc.getName, false, Thread.currentThread.getContextClassLoader) } def readObject[T](): T = objIn.readObject().asInstanceOf[T] diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index e43516c84bd3792c0738c0734c3ebf471f474e73..1044bf18aa07cf9b13d2cbf6071db1127f521ac1 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -33,7 +33,7 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { val bytes = Utils.serialize(tasks(i)) logInfo("Size of task " + i + " is " + bytes.size + " bytes") val deserializedTask = Utils.deserialize[Task[_]]( - bytes, currentThread.getContextClassLoader) + bytes, Thread.currentThread.getContextClassLoader) val result: Any = deserializedTask.run(myAttemptId) val accumUpdates = Accumulators.values logInfo("Finished task " + i) diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala index 2001205878e65826886801efb8f61028b4595d11..9eee747cfdf5e422fc2c7f9fa57f644ab1b2b0e1 100644 --- a/core/src/main/scala/spark/SimpleJob.scala +++ b/core/src/main/scala/spark/SimpleJob.scala @@ -27,7 +27,7 @@ extends Job(jobId) with Logging // Maximum times a task is allowed to fail before failing the job val MAX_TASK_FAILURES = 4 - val callingThread = currentThread + val callingThread = Thread.currentThread val tasks = tasksSeq.toArray val numTasks = tasks.length val launched = new Array[Boolean](numTasks) diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 837129c665c6793eb76772e5b54055f05fdc5d2f..f492ca762c4092f63bc72e6c9dd90cb4b12d1936 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -182,7 +182,7 @@ extends Logging with Serializable { private def byteArrayToObject[OUT](bytes: Array[Byte]): OUT = { val in = new ObjectInputStream (new ByteArrayInputStream (bytes)){ override def resolveClass(desc: ObjectStreamClass) = - Class.forName(desc.getName, false, currentThread.getContextClassLoader) + Class.forName(desc.getName, false, Thread.currentThread.getContextClassLoader) } val retVal = in.readObject.asInstanceOf[OUT] in.close()