Skip to content
Snippets Groups Projects
Commit 92a4c2a5 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Fixing bug in local scheduler time recording

parent ee692482
Branches sams_playpen2
No related tags found
No related merge requests found
...@@ -153,10 +153,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: ...@@ -153,10 +153,10 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// this adds a bit of unnecessary overhead but matches how the Mesos Executor works. // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes) val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile updateDependencies(taskFiles, taskJars) // Download any files added with addFile
val deserStart = System.currentTimeMillis() val taskStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]]( val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader) taskBytes, Thread.currentThread.getContextClassLoader)
val deserTime = System.currentTimeMillis() - deserStart val deserTime = System.currentTimeMillis() - taskStart
// Run it // Run it
val result: Any = deserializedTask.run(taskId) val result: Any = deserializedTask.run(taskId)
...@@ -170,8 +170,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: ...@@ -170,8 +170,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
val resultToReturn = ser.deserialize[Any](serResult) val resultToReturn = ser.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]]( val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values)) ser.serialize(Accumulators.values))
val serviceTime = System.currentTimeMillis() - taskStart
logInfo("Finished " + taskId) logInfo("Finished " + taskId)
deserializedTask.metrics.get.executorRunTime = deserTime.toInt//info.duration.toInt //close enough deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment