Skip to content
Snippets Groups Projects
Commit 620de2dd authored by Ismael Juma's avatar Ismael Juma
Browse files

Change currentThread to Thread.currentThread as the former is deprecated.

parent 0fba22b3
No related branches found
No related tags found
No related merge requests found
...@@ -52,20 +52,20 @@ private object Accumulators ...@@ -52,20 +52,20 @@ private object Accumulators
if (original) { if (original) {
originals(a.id) = a originals(a.id) = a
} else { } else {
val accums = localAccums.getOrElseUpdate(currentThread, Map()) val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a accums(a.id) = a
} }
} }
// Clear the local (non-original) accumulators for the current thread // Clear the local (non-original) accumulators for the current thread
def clear: Unit = synchronized { def clear: Unit = synchronized {
localAccums.remove(currentThread) localAccums.remove(Thread.currentThread)
} }
// Get the values of the local accumulators for the current thread (by ID) // Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized { def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]() val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(currentThread, Map())) for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map()))
ret(id) = accum.value ret(id) = accum.value
return ret return ret
} }
......
...@@ -12,7 +12,7 @@ class JavaSerializationStream(out: OutputStream) extends SerializationStream { ...@@ -12,7 +12,7 @@ class JavaSerializationStream(out: OutputStream) extends SerializationStream {
class JavaDeserializationStream(in: InputStream) extends DeserializationStream { class JavaDeserializationStream(in: InputStream) extends DeserializationStream {
val objIn = new ObjectInputStream(in) { val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) = override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, currentThread.getContextClassLoader) Class.forName(desc.getName, false, Thread.currentThread.getContextClassLoader)
} }
def readObject[T](): T = objIn.readObject().asInstanceOf[T] def readObject[T](): T = objIn.readObject().asInstanceOf[T]
......
...@@ -33,7 +33,7 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging { ...@@ -33,7 +33,7 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
val bytes = Utils.serialize(tasks(i)) val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes") logInfo("Size of task " + i + " is " + bytes.size + " bytes")
val deserializedTask = Utils.deserialize[Task[_]]( val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader) bytes, Thread.currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(myAttemptId) val result: Any = deserializedTask.run(myAttemptId)
val accumUpdates = Accumulators.values val accumUpdates = Accumulators.values
logInfo("Finished task " + i) logInfo("Finished task " + i)
......
...@@ -27,7 +27,7 @@ extends Job(jobId) with Logging ...@@ -27,7 +27,7 @@ extends Job(jobId) with Logging
// Maximum times a task is allowed to fail before failing the job // Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4 val MAX_TASK_FAILURES = 4
val callingThread = currentThread val callingThread = Thread.currentThread
val tasks = tasksSeq.toArray val tasks = tasksSeq.toArray
val numTasks = tasks.length val numTasks = tasks.length
val launched = new Array[Boolean](numTasks) val launched = new Array[Boolean](numTasks)
......
...@@ -182,7 +182,7 @@ extends Logging with Serializable { ...@@ -182,7 +182,7 @@ extends Logging with Serializable {
private def byteArrayToObject[OUT](bytes: Array[Byte]): OUT = { private def byteArrayToObject[OUT](bytes: Array[Byte]): OUT = {
val in = new ObjectInputStream (new ByteArrayInputStream (bytes)){ val in = new ObjectInputStream (new ByteArrayInputStream (bytes)){
override def resolveClass(desc: ObjectStreamClass) = override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, currentThread.getContextClassLoader) Class.forName(desc.getName, false, Thread.currentThread.getContextClassLoader)
} }
val retVal = in.readObject.asInstanceOf[OUT] val retVal = in.readObject.asInstanceOf[OUT]
in.close() in.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment