Skip to content
Snippets Groups Projects
Commit d8bf00c1 authored by skeirik2's avatar skeirik2
Browse files

modified task data-struct

parent 0f2f3c03
No related branches found
No related tags found
No related merge requests found
...@@ -60,9 +60,11 @@ private[spark] class ResultTask[T, U]( ...@@ -60,9 +60,11 @@ private[spark] class ResultTask[T, U](
serializedTaskMetrics: Array[Byte], serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None, jobId: Option[Int] = None,
appId: Option[String] = None, appId: Option[String] = None,
appAttemptId: Option[String] = None) appAttemptId: Option[String] = None,
depMap: HashMap[Int, Set[Int]] = null,
curRunningRddMap: HashMap[Int, Set[Int]] = null)
extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics,
jobId, appId, appAttemptId) jobId, appId, appAttemptId,depMap,curRunningRddMap)
with Serializable { with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = { @transient private[this] val preferredLocs: Seq[TaskLocation] = {
......
...@@ -60,7 +60,10 @@ private[spark] class ShuffleMapTask( ...@@ -60,7 +60,10 @@ private[spark] class ShuffleMapTask(
serializedTaskMetrics: Array[Byte], serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None, jobId: Option[Int] = None,
appId: Option[String] = None, appId: Option[String] = None,
appAttemptId: Option[String] = None) appAttemptId: Option[String] = None,
depMap: HashMap[Int, Set[Int]] = None,
curRunningRddMap: HashMap[Int, Set[Int]] = None
)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
serializedTaskMetrics, jobId, appId, appAttemptId) serializedTaskMetrics, jobId, appId, appAttemptId)
with Logging { with Logging {
......
...@@ -60,7 +60,10 @@ private[spark] abstract class Task[T]( ...@@ -60,7 +60,10 @@ private[spark] abstract class Task[T](
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(), SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None, val jobId: Option[Int] = None,
val appId: Option[String] = None, val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable { val appAttemptId: Option[String] = None,
var depMap: HashMap[Int, Set[Int]] = new HashMap[Int, Set[Int]],
var curRunningRddMap: HashMap[Int, Set[Int]] =
new HashMap[Int, Set[Int]]) extends Serializable {
@transient lazy val metrics: TaskMetrics = @transient lazy val metrics: TaskMetrics =
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics)) SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment