Skip to content
Snippets Groups Projects
Commit 9821cd4d authored by Reynold Xin's avatar Reynold Xin
Browse files

Force serialize/deserialize task results in local execution mode.

parent f709b3ad
No related branches found
No related tags found
No related merge requests found
......@@ -46,9 +46,15 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
idInJob, bytes.size, timeTaken))
val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId)
// Serialize and deserialize the result to emulate what the mesos
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
val resultToReturn = ser.deserialize[Any](ser.serialize(result))
val accumUpdates = Accumulators.values
logInfo("Finished task " + idInJob)
taskEnded(task, Success, result, accumUpdates)
taskEnded(task, Success, resultToReturn, accumUpdates)
} catch {
case t: Throwable => {
logError("Exception in task " + idInJob, t)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment