Skip to content
Snippets Groups Projects
Commit 3b745176 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Bug fix to pluggable closure serialization change

parent 112655f0
No related branches found
No related tags found
No related merge requests found
...@@ -42,10 +42,9 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule ...@@ -42,10 +42,9 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
val startTime = System.currentTimeMillis val startTime = System.currentTimeMillis
val bytes = ser.serialize(task) val bytes = ser.serialize(task)
val timeTaken = System.currentTimeMillis - startTime val timeTaken = System.currentTimeMillis - startTime
logInfo("Size of task %d is %d bytes and took %d ms to serialize by %s" logInfo("Size of task %d is %d bytes and took %d ms to serialize".format(
.format(idInJob, bytes.size, timeTaken, ser.getClass.getName)) idInJob, bytes.size, timeTaken))
val deserializedTask = ser.deserialize[Task[_]]( val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader)
bytes, Thread.currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId) val result: Any = deserializedTask.run(attemptId)
val accumUpdates = Accumulators.values val accumUpdates = Accumulators.values
logInfo("Finished task " + idInJob) logInfo("Finished task " + idInJob)
......
...@@ -218,7 +218,7 @@ class SimpleJob( ...@@ -218,7 +218,7 @@ class SimpleJob(
logInfo("Finished TID %s (progress: %d/%d)".format(tid, tasksFinished, numTasks)) logInfo("Finished TID %s (progress: %d/%d)".format(tid, tasksFinished, numTasks))
// Deserialize task result // Deserialize task result
val result = ser.deserialize[TaskResult[_]]( val result = ser.deserialize[TaskResult[_]](
status.getData.toByteArray) status.getData.toByteArray, getClass.getClassLoader)
sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates) sched.taskEnded(tasks(index), Success, result.value, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks // Mark finished and stop if we've finished all the tasks
finished(index) = true finished(index) = true
...@@ -241,7 +241,7 @@ class SimpleJob( ...@@ -241,7 +241,7 @@ class SimpleJob(
// task will never succeed on any node, so tell the scheduler about it. // task will never succeed on any node, so tell the scheduler about it.
if (status.getData != null && status.getData.size > 0) { if (status.getData != null && status.getData.size > 0) {
val reason = ser.deserialize[TaskEndReason]( val reason = ser.deserialize[TaskEndReason](
status.getData.toByteArray) status.getData.toByteArray, getClass.getClassLoader)
reason match { reason match {
case fetchFailed: FetchFailed => case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri) logInfo("Loss was due to fetch failure from " + fetchFailed.serverUri)
......
...@@ -12,7 +12,6 @@ import scala.util.Random ...@@ -12,7 +12,6 @@ import scala.util.Random
* Various utility methods used by Spark. * Various utility methods used by Spark.
*/ */
object Utils { object Utils {
def serialize[T](o: T): Array[Byte] = { def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream() val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos) val oos = new ObjectOutputStream(bos)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment